Skip to content

Add IPFS usage metrics / extend logging / extend supported content path formats #6058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use anyhow::{anyhow, Error};
use anyhow::{ensure, Context};
use graph::blockchain::{BlockPtr, TriggerWithHandler};
use graph::components::link_resolver::LinkResolverContext;
use graph::components::metrics::subgraph::SubgraphInstanceMetrics;
use graph::components::store::{EthereumCallCache, StoredDynamicDataSource};
use graph::components::subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError};
use graph::components::trigger_processor::RunnableTriggers;
use graph::data::subgraph::DeploymentHash;
use graph::data_source::common::{
CallDecls, DeclaredCall, FindMappingABI, MappingABI, UnresolvedMappingABI,
};
Expand Down Expand Up @@ -1197,6 +1199,7 @@ pub struct UnresolvedDataSource {
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
manifest_idx: u32,
Expand All @@ -1210,7 +1213,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
context,
} = self;

let mapping = mapping.resolve(resolver, logger).await.with_context(|| {
let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| {
format!(
"failed to resolve data source {} with source_address {:?} and source_start_block {}",
name, source.address, source.start_block
Expand Down Expand Up @@ -1244,6 +1247,7 @@ pub struct DataSourceTemplate {
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
manifest_idx: u32,
Expand All @@ -1257,7 +1261,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
} = self;

let mapping = mapping
.resolve(resolver, logger)
.resolve(deployment_hash, resolver, logger)
.await
.with_context(|| format!("failed to resolve data source template {}", name))?;

Expand Down Expand Up @@ -1355,6 +1359,7 @@ impl FindMappingABI for Mapping {
impl UnresolvedMapping {
pub async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
) -> Result<Mapping, anyhow::Error> {
Expand All @@ -1377,13 +1382,17 @@ impl UnresolvedMapping {
abis.into_iter()
.map(|unresolved_abi| async {
Result::<_, Error>::Ok(Arc::new(
unresolved_abi.resolve(resolver, logger).await?,
unresolved_abi
.resolve(deployment_hash, resolver, logger)
.await?,
))
})
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>(),
async {
let module_bytes = resolver.cat(logger, &link).await?;
let module_bytes = resolver
.cat(LinkResolverContext::new(deployment_hash, logger), &link)
.await?;
Ok(Arc::new(module_bytes))
},
)
Expand Down
12 changes: 8 additions & 4 deletions chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use graph::anyhow::Context;
use graph::blockchain::{Block, TriggerWithHandler};
use graph::components::link_resolver::LinkResolverContext;
use graph::components::store::StoredDynamicDataSource;
use graph::components::subgraph::InstanceDSTemplateInfo;
use graph::data::subgraph::DataSourceContext;
use graph::data::subgraph::{DataSourceContext, DeploymentHash};
use graph::prelude::SubgraphManifestValidationError;
use graph::{
anyhow::{anyhow, Error},
Expand Down Expand Up @@ -330,6 +331,7 @@ pub struct UnresolvedDataSource {
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
Expand All @@ -343,7 +345,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
context,
} = self;

let mapping = mapping.resolve(resolver, logger).await.with_context(|| {
let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| {
format!(
"failed to resolve data source {} with source_account {:?} and source_start_block {}",
name, source.account, source.start_block
Expand All @@ -369,6 +371,7 @@ pub type DataSourceTemplate = BaseDataSourceTemplate<Mapping>;
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
Expand All @@ -381,7 +384,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
} = self;

let mapping = mapping
.resolve(resolver, logger)
.resolve(deployment_hash, resolver, logger)
.await
.with_context(|| format!("failed to resolve data source template {}", name))?;

Expand Down Expand Up @@ -432,6 +435,7 @@ pub struct UnresolvedMapping {
impl UnresolvedMapping {
pub async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
) -> Result<Mapping, Error> {
Expand All @@ -447,7 +451,7 @@ impl UnresolvedMapping {
let api_version = semver::Version::parse(&api_version)?;

let module_bytes = resolver
.cat(logger, &link)
.cat(LinkResolverContext::new(deployment_hash, logger), &link)
.await
.with_context(|| format!("failed to resolve mapping {}", link.link))?;

Expand Down
39 changes: 30 additions & 9 deletions chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use anyhow::{anyhow, Context, Error};
use graph::{
blockchain,
cheap_clone::CheapClone,
components::{link_resolver::LinkResolver, subgraph::InstanceDSTemplateInfo},
components::{
link_resolver::{LinkResolver, LinkResolverContext},
subgraph::InstanceDSTemplateInfo,
},
data::subgraph::DeploymentHash,
prelude::{async_trait, BlockNumber, Link},
slog::Logger,
};
Expand Down Expand Up @@ -184,11 +188,17 @@ pub struct UnresolvedMapping {
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
async fn resolve(
self,
deployment_hash: &DeploymentHash,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSource, Error> {
let content = resolver.cat(logger, &self.source.package.file).await?;
let content = resolver
.cat(
LinkResolverContext::new(deployment_hash, logger),
&self.source.package.file,
)
.await?;

let mut package = graph::substreams::Package::decode(content.as_ref())?;

Expand Down Expand Up @@ -234,7 +244,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
let handler = match (self.mapping.handler, self.mapping.file) {
(Some(handler), Some(file)) => {
let module_bytes = resolver
.cat(logger, &file)
.cat(LinkResolverContext::new(deployment_hash, logger), &file)
.await
.with_context(|| format!("failed to resolve mapping {}", file.link))?;

Expand Down Expand Up @@ -314,6 +324,7 @@ impl blockchain::DataSourceTemplate<Chain> for NoopDataSourceTemplate {
impl blockchain::UnresolvedDataSourceTemplate<Chain> for NoopDataSourceTemplate {
async fn resolve(
self,
_deployment_hash: &DeploymentHash,
_resolver: &Arc<dyn LinkResolver>,
_logger: &Logger,
_manifest_idx: u32,
Expand All @@ -329,7 +340,7 @@ mod test {
use anyhow::Error;
use graph::{
blockchain::{DataSource as _, UnresolvedDataSource as _},
components::link_resolver::LinkResolver,
components::link_resolver::{LinkResolver, LinkResolverContext},
data::subgraph::LATEST_VERSION,
prelude::{async_trait, serde_yaml, JsonValueStream, Link},
slog::{o, Discard, Logger},
Expand Down Expand Up @@ -433,7 +444,10 @@ mod test {
let ds: UnresolvedDataSource = serde_yaml::from_str(TEMPLATE_DATA_SOURCE).unwrap();
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
let logger = Logger::root(Discard, o!());
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap();
let ds: DataSource = ds
.resolve(&Default::default(), &link_resolver, &logger, 0)
.await
.unwrap();
let expected = DataSource {
kind: SUBSTREAMS_KIND.into(),
network: Some("mainnet".into()),
Expand Down Expand Up @@ -470,7 +484,10 @@ mod test {
serde_yaml::from_str(TEMPLATE_DATA_SOURCE_WITH_PARAMS).unwrap();
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
let logger = Logger::root(Discard, o!());
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap();
let ds: DataSource = ds
.resolve(&Default::default(), &link_resolver, &logger, 0)
.await
.unwrap();
let expected = DataSource {
kind: SUBSTREAMS_KIND.into(),
network: Some("mainnet".into()),
Expand Down Expand Up @@ -705,17 +722,21 @@ mod test {
unimplemented!()
}

async fn cat(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
async fn cat(&self, _ctx: LinkResolverContext, _link: &Link) -> Result<Vec<u8>, Error> {
Ok(gen_package().encode_to_vec())
}

async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
async fn get_block(
&self,
_ctx: LinkResolverContext,
_link: &Link,
) -> Result<Vec<u8>, Error> {
unimplemented!()
}

async fn json_stream(
&self,
_logger: &Logger,
_ctx: LinkResolverContext,
_link: &Link,
) -> Result<JsonValueStream, Error> {
unimplemented!()
Expand Down
47 changes: 36 additions & 11 deletions core/src/polling_monitor/ipfs_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ use anyhow::anyhow;
use anyhow::Error;
use bytes::Bytes;
use graph::futures03::future::BoxFuture;
use graph::ipfs::ContentPath;
use graph::ipfs::IpfsClient;
use graph::ipfs::RetryPolicy;
use graph::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy};
use graph::{derive::CheapClone, prelude::CheapClone};
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};

pub type IpfsService = Buffer<ContentPath, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
pub type IpfsService = Buffer<IpfsRequest, BoxFuture<'static, Result<Option<Bytes>, Error>>>;

#[derive(Clone, Debug)]
pub struct IpfsRequest {
pub ctx: IpfsContext,
pub path: ContentPath,
}

pub fn ipfs_service(
client: Arc<dyn IpfsClient>,
Expand Down Expand Up @@ -43,7 +47,10 @@ struct IpfsServiceInner {
}

impl IpfsServiceInner {
async fn call_inner(self, path: ContentPath) -> Result<Option<Bytes>, Error> {
async fn call_inner(
self,
IpfsRequest { ctx, path }: IpfsRequest,
) -> Result<Option<Bytes>, Error> {
let multihash = path.cid().hash().code();
if !SAFE_MULTIHASHES.contains(&multihash) {
return Err(anyhow!("CID multihash {} is not allowed", multihash));
Expand All @@ -52,6 +59,7 @@ impl IpfsServiceInner {
let res = self
.client
.cat(
ctx,
&path,
self.max_file_size,
Some(self.timeout),
Expand Down Expand Up @@ -126,14 +134,24 @@ mod test {

let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash;

let client =
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
.unwrap();
let client = IpfsRpcClient::new_unchecked(
ServerAddress::local_rpc_api(),
Default::default(),
&graph::log::discard(),
)
.unwrap();

let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10);

let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
let content = svc.oneshot(path).await.unwrap().unwrap();
let content = svc
.oneshot(IpfsRequest {
ctx: Default::default(),
path,
})
.await
.unwrap()
.unwrap();

assert_eq!(content.to_vec(), random_bytes);
}
Expand All @@ -157,7 +175,8 @@ mod test {
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";

let server = MockServer::start().await;
let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap();
let ipfs_client =
IpfsRpcClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap();
let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1);
let path = ContentPath::new(CID).unwrap();

Expand All @@ -179,6 +198,12 @@ mod test {
.await;

// This means that we never reached the successful response.
ipfs_service.oneshot(path).await.unwrap_err();
ipfs_service
.oneshot(IpfsRequest {
ctx: Default::default(),
path,
})
.await
.unwrap_err();
}
}
Loading