본문으로 건너뛰기

NEAR Lake 프레임워크로 마이그레이션

NEAR 인덱서 프레임워크를 꼭 사용해야만 하는 이유가 없는 모든 사용자는 NEAR Lake 프레임워크로의 마이그레이션을 고려하는 것이 좋습니다.

이 튜토리얼에서는 indexer-tx-watcher-example을 쇼케이스로 사용하여 프로젝트를 마이그레이션하는 방법을 보여줍니다.

소스 코드

마이그레이션된 인덱서의 소스 코드는 GitHub https://github.com/near-examples/indexer-tx-watcher-example-lake/tree/0.4.0에서 찾을 수 있습니다.

Diffs

문서 끝 부분에 참조용으로 diff를 게시했습니다. 인덱서를 마이그레이션하기 위해 diff만 필요한 경우, 아래로 스크롤하여 확인할 수 있습니다.

의존성(Dependency) 변경

우선 Cargo.toml 내 의존성부터 시작해봅시다.

src/Cargo.toml

[package]
name = "indexer-tx-watcher-example"
version = "0.1.0"
authors = ["Near Inc <hello@nearprotocol.com>"]
edition = "2018"

[dependencies]
actix = "=0.11.0-beta.2"
actix-rt = "=2.2.0" # remove it once actix is upgraded to 0.11+
base64 = "0.11"
clap = "3.0.0-beta.1"
openssl-probe = { version = "0.1.2" }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
tokio = { version = "1.1", features = ["sync"] }
tracing = "0.1.13"
tracing-subscriber = "0.2.4"

near-indexer = { git = "https://github.com/near/nearcore", rev = "25b000ae4dd9fe784695d07a3f2e99d82a6f10bd" }
  • edition2021로 업데이트
  • actix 크레이트 삭제
  • openssl-probe 크레이트 삭제
  • itertoolsfutures 추가
  • tokio 런타임을 사용하므로, tokio 기능 추가
  • tokio-stream 크레이트 추가
  • near-indexernear-lake-framework로 교체

결국 다음과 같은 형태가 될 것입니다.

src/Cargo.toml
[package]
name = "indexer-tx-watcher-example"
version = "0.1.0"
authors = ["Near Inc <hello@nearprotocol.com>"]
edition = "2021"

[dependencies]
base64 = "0.11"
clap = { version = "3.1.6", features = ["derive"] }
futures = "0.3.5"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
itertools = "0.9.0"
tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1" }
tracing = "0.1.13"
tracing-subscriber = "0.2.4"

near-lake-framework = "0.4.0"

clap config 변경

현재 우리는 RunInit 명령을 포함한 하위 명령이 있는 Opts 구조를 가지고 있습니다. NEAR Lake 프레임워크에는 data와 구성(config) 파일이 필요하지 않기 때문에, Init 역시 전혀 필요하지 않습니다. 따라서 일부 구조를 Opts로 합쳐야 합니다.

src/config.rs
...
/// NEAR Indexer Example
/// Watches for stream of blocks from the chain
#[derive(Clap, Debug)]
#[clap(version = "0.1", author = "Near Inc. <hello@nearprotocol.com>")]
pub(crate) struct Opts {
/// Sets a custom config dir. Defaults to ~/.near/
#[clap(short, long)]
pub home_dir: Option<std::path::PathBuf>,
#[clap(subcommand)]
pub subcmd: SubCommand,
}

#[derive(Clap, Debug)]
pub(crate) enum SubCommand {
/// Run NEAR Indexer Example. Start observe the network
Run(RunArgs),
/// Initialize necessary configs
Init(InitConfigArgs),
}

#[derive(Clap, Debug)]
pub(crate) struct RunArgs {
/// account ids to watch for
#[clap(long)]
pub accounts: String,
}

#[derive(Clap, Debug)]
pub(crate) struct InitConfigArgs {
...
}
...

다음과 같은 것들을 진행할 것입니다.

  • InitConfigArgs를 완전히 삭제
  • RunArgs에서 Opts로 내용물을 완전히 옮긴 뒤, RunArgs 삭제
  • Opts에서 home_dir 삭제
  • 인덱싱을 시작할 블록 높이를 알기 위해, Optsblock_height 추가
  • 변수에 대한 SubCommand 리팩터링 : 인덱싱할 체인을 정의하기 위한 메인넷 및 테스트넷
  • 나중을 위해 구조체에 Clone 추가
src/config.rs
/// NEAR Indexer Example
/// Watches for stream of blocks from the chain
#[derive(Clap, Debug, Clone)]
#[clap(version = "0.1", author = "Near Inc. <hello@nearprotocol.com>")]
pub(crate) struct Opts {
/// block height to start indexing from
#[clap(long)]
pub block_height: u64,
/// account ids to watch for
#[clap(long)]
pub accounts: String,
#[clap(subcommand)]
pub subcmd: SubCommand,
}

#[derive(Clap, Debug, Clone)]
pub(crate) enum SubCommand {
Mainnet,
Testnet,
}

파일 끝 부분에 교체해야 할 구현 사항이 하나 있습니다.

src/config.rs
...
impl From<InitConfigArgs> for near_indexer::InitConfigArgs {
...
}

near_lake_framework::LakeConfigOpts를 캐스팅할 수 있기를 원합니다. 따라서 아래와 같이 새롭게 구현할 것입니다.

src/config.rs
impl From<Opts> for near_lake_framework::LakeConfig {
fn from(opts: Opts) -> Self {
let mut lake_config =
near_lake_framework::LakeConfigBuilder::default().start_block_height(opts.block_height);

match &opts.subcmd {
SubCommand::Mainnet => {
lake_config = lake_config.mainnet();
}
SubCommand::Testnet => {
lake_config = lake_config.testnet();
}
};

lake_config.build().expect("Failed to build LakeConfig")
}
}

마지막 조치는 init_logging을 변경하여, 중복 로그 구독을 제거하는 제거하는 것입니다.

src/config.rs

...
pub(crate) fn init_logging() {
let env_filter = EnvFilter::new(
"tokio_reactor=info,near=info,stats=info,telemetry=info,indexer_example=info,indexer=info,near-performance-metrics=info",
);
tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(env_filter)
.with_writer(std::io::stderr)
.init();
}
...

이를 다음으로 교체합니다.

src/config.rs
...
pub(crate) fn init_logging() {
let env_filter = EnvFilter::new("near_lake_framework=info");
tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(env_filter)
.with_writer(std::io::stderr)
.init();
}
...

src/config.rs에서 할 일은 끝났고, src/main.rs을 수정해 봅시다.

인덱서 인스턴스화 교체

tokio 런타임을 사용할 수 있고, main 함수를 비동기적으로 작동하도록 만들 수 있기 때문에, 리팩토링 프로세스보다 main 함수 재생성을 위주로 보여드리도록 하겠습니다.

가져오기(import) 섹션부터 시작하겠습니다.

수정 전 Import

src/main.rs
use std::str::FromStr;

use std::collections::{HashMap, HashSet};

use clap::Clap;
use tokio::sync::mpsc;
use tracing::info;

use configs::{init_logging, Opts, SubCommand};

mod configs;

수정 후 Import

near_lake_framework import를 추가하고, configs로부터의 중복된 import를 삭제하였습니다.

src/main.rs
use std::str::FromStr;

use std::collections::{HashMap, HashSet};

use clap::Clap;
use tokio::sync::mpsc;
use tracing::info;

use near_lake_framework::near_indexer_primitives;
use near_lake_framework::LakeConfig;

use configs::{init_logging, Opts};

main() 생성

비동기 main() 함수를 만들어, init_logging을 호출하고 Opts를 읽습니다.

src/main.rs
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
init_logging();

let opts: Opts = Opts::parse();

Opts에서 LakeConfig를 캐스팅하고, NEAR Lake 프레임워크stream을 인스턴스화합니다.

src/main.rs
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
init_logging();

let opts: Opts = Opts::parse();

let config: LakeConfig = opts.clone().into();

let (_, stream) = near_lake_framework::streamer(config);

이전 main()으로부터 accounts 인자를 읽는 코드를 Vec<AccountId>로 복사/붙여넣기하세요.

src/main.rs
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
init_logging();

let opts: Opts = Opts::parse();

let config: LakeConfig = opts.clone().into();

let (_, stream) = near_lake_framework::streamer(config);

let watching_list = opts
.accounts
.split(',')
.map(|elem| {
near_indexer_primitives::types::AccountId::from_str(elem).expect("AccountId is invalid")
})
.collect();

이제 NEAR 인덱서 프레임워크 내 구축되었던 인덱서에서 이전에 사용했던 listen_blocks 함수를 호출할 수 있습니다. 그리고 main() 함수가 행복할 수 있도록 Ok(())를 반환하세요.

NEAR Lake 프레임워크 스트림 및 최종 main 함수

src/main.rs
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
init_logging();

let opts: Opts = Opts::parse();

let config: LakeConfig = opts.clone().into();

let (_, stream) = near_lake_framework::streamer(config);

let watching_list = opts
.accounts
.split(',')
.map(|elem| {
near_indexer_primitives::types::AccountId::from_str(elem).expect("AccountId is invalid")
})
.collect();

listen_blocks(stream, watching_list).await;

Ok(())
}

이제 끝났습니다. 이는 거의 완전한 main() 함수입니다. 이전 함수는 삭제하시면 됩니다.

자료형 관련 함수 내 변경 사항

NEAR Lake 프레임워크 출시와 함께, 인덱서용으로 생성된 구조를 별도의 크레이트로 추출했습니다. 이는 nearcore 내 의존성을 피하기 위해 수행되었는데, 이제 crates.io에 이미 게시된 별도의 크레이트 또는 해당 크레이트를 노출하는 NEAR Lake 프레임워크에 의존할 수 있기 때문입니다.

listen_blocks

자료형의 새 위치를 가리키도록 함수 서명을 변경해야 합니다.

src/main.rs
async fn listen_blocks(
mut stream: mpsc::Receiver<near_indexer::StreamerMessage>,
watching_list: Vec<near_indexer::near_primitives::types::AccountId>,
) {
src/main.rs
async fn listen_blocks(
mut stream: mpsc::Receiver<near_indexer_primitives::StreamerMessage>,
watching_list: Vec<near_indexer_primitives::types::AccountId>,
) {

다음은 near_indexer::near_primitivesnear_indexer_primitives로 교체되어야 하는 세 가지 위치를 나타냅니다.

src/main.rs

if let near_indexer_primitives::views::ReceiptEnumView::Action {
src/main.rs
if let near_indexer_primitives::views::ReceiptEnumView::Action {
src/main.rs
if let near_indexer_primitives::views::ActionView::FunctionCall {

is_tx_receiver_watched()

그리고 함수 is_tx_receiver_watched()의 자료형에 대한 최종 변경 사항은 다음과 같습니다.

src/main.rs
fn is_tx_receiver_watched(
tx: &near_indexer_primitives::IndexerTransactionWithOutcome,
watching_list: &[near_indexer_primitives::types::AccountId],
) -> bool {
watching_list.contains(&tx.transaction.receiver_id)
}

자격 증명

NEAR Lake 프레임워크에서 데이터에 액세스하려면 자격 증명을 구성해야 합니다.

결론

이제 NEAR Lake 프레임워크 인덱서로 완전히 마이그레이션되었습니다.

참조용으로, 밑에 전체 diff를 게시하고 있습니다.

Diffs

Cargo.toml
--- a/Cargo.toml
+++ b/Cargo.toml

@@ -2,18 +2,18 @@

name = "indexer-tx-watcher-example"

version = "0.1.0"

authors = ["Near Inc <hello@nearprotocol.com>"]

-edition = "2018"

+edition = "2021"



[dependencies]

-actix = "=0.11.0-beta.2"

-actix-rt = "=2.2.0" # remove it once actix is upgraded to 0.11+

base64 = "0.11"

-clap = "3.0.0-beta.1"

-openssl-probe = { version = "0.1.2" }

+clap = { version = "3.1.6", features = ["derive"] }

+futures = "0.3.5"

serde = { version = "1", features = ["derive"] }

serde_json = "1.0.55"

-tokio = { version = "1.1", features = ["sync"] }

+itertools = "0.9.0"

+tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] }

+tokio-stream = { version = "0.1" }

tracing = "0.1.13"

tracing-subscriber = "0.2.4"



-near-indexer = { git = "https://github.com/near/nearcore", rev = "25b000ae4dd9fe784695d07a3f2e99d82a6f10bd" }

+near-lake-framework = "0.4.0"
src/configs.rs
--- a/src/configs.rs

+++ b/src/configs.rs
@@ -1,99 +1,50 @@
-use clap::Clap;
+use clap::Parser;

use tracing_subscriber::EnvFilter;

/// NEAR Indexer Example
/// Watches for stream of blocks from the chain
-#[derive(Clap, Debug)]
+#[derive(Parser, Debug, Clone)]
#[clap(version = "0.1", author = "Near Inc. <hello@nearprotocol.com>")]
pub(crate) struct Opts {
- /// Sets a custom config dir. Defaults to ~/.near/
- #[clap(short, long)]
- pub home_dir: Option<std::path::PathBuf>,
- #[clap(subcommand)]
- pub subcmd: SubCommand,
-}
-
-#[derive(Clap, Debug)]
-pub(crate) enum SubCommand {
- /// Run NEAR Indexer Example. Start observe the network
- Run(RunArgs),
- /// Initialize necessary configs
- Init(InitConfigArgs),
-}
-
-#[derive(Clap, Debug)]
-pub(crate) struct RunArgs {
+ /// block height to start indexing from
+ #[clap(long)]
+ pub block_height: u64,
/// account ids to watch for
#[clap(long)]
pub accounts: String,
+ #[clap(subcommand)]
+ pub subcmd: SubCommand,
}

-#[derive(Clap, Debug)]
-pub(crate) struct InitConfigArgs {
- /// chain/network id (localnet, testnet, devnet, betanet)
- #[clap(short, long)]
- pub chain_id: Option<String>,
- /// Account ID for the validator key
- #[clap(long)]
- pub account_id: Option<String>,
- /// Specify private key generated from seed (TESTING ONLY)
- #[clap(long)]
- pub test_seed: Option<String>,
- /// Number of shards to initialize the chain with
- #[clap(short, long, default_value = "1")]
- pub num_shards: u64,
- /// Makes block production fast (TESTING ONLY)
- #[clap(short, long)]
- pub fast: bool,
- /// Genesis file to use when initialize testnet (including downloading)
- #[clap(short, long)]
- pub genesis: Option<String>,
- /// Download the verified NEAR genesis file automatically.
- #[clap(long)]
- pub download_genesis: bool,
- /// Specify a custom download URL for the genesis file.
- #[clap(long)]
- pub download_genesis_url: Option<String>,
- /// Download the verified NEAR config file automtically.
- #[clap(long)]
- pub download_config: bool,
- /// Specify a custom download URL for the config file.
- #[clap(long)]
- pub download_config_url: Option<String>,
- /// Specify the boot nodes to bootstrap the network
- #[clap(long)]
- pub boot_nodes: Option<String>,
- /// Specify a custom max_gas_burnt_view limit.
- #[clap(long)]
- pub max_gas_burnt_view: Option<u64>,
+#[derive(Parser, Debug, Clone)]
+pub(crate) enum SubCommand {
+ Mainnet,
+ Testnet,
}

pub(crate) fn init_logging() {
- let env_filter = EnvFilter::new(
- "tokio_reactor=info,near=info,stats=info,telemetry=info,indexer_example=info,indexer=info,near-performance-metrics=info",
- );
+ let env_filter = EnvFilter::new("near_lake_framework=info");
tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(env_filter)
.with_writer(std::io::stderr)
.init();
}

-impl From<InitConfigArgs> for near_indexer::InitConfigArgs {
- fn from(config_args: InitConfigArgs) -> Self {
- Self {
- chain_id: config_args.chain_id,
- account_id: config_args.account_id,
- test_seed: config_args.test_seed,
- num_shards: config_args.num_shards,
- fast: config_args.fast,
- genesis: config_args.genesis,
- download_genesis: config_args.download_genesis,
- download_genesis_url: config_args.download_genesis_url,
- download_config: config_args.download_config,
- download_config_url: config_args.download_config_url,
- boot_nodes: config_args.boot_nodes,
- max_gas_burnt_view: config_args.max_gas_burnt_view,
- }
+impl From<Opts> for near_lake_framework::LakeConfig {
+ fn from(opts: Opts) -> Self {
+ let mut lake_config =
+ near_lake_framework::LakeConfigBuilder::default().start_block_height(opts.block_height);
+
+ match &opts.subcmd {
+ SubCommand::Mainnet => {
+ lake_config = lake_config.mainnet();
+ }
+ SubCommand::Testnet => {
+ lake_config = lake_config.testnet();
+ }
+ };
+
+ lake_config.build().expect("Failed to build LakeConfig")
}
}
src/main.rs
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,11 +2,14 @@

use std::collections::{HashMap, HashSet};

-use clap::Clap;
+use clap::Parser;
use tokio::sync::mpsc;
use tracing::info;

-use configs::{init_logging, Opts, SubCommand};
+use near_lake_framework::near_indexer_primitives;
+use near_lake_framework::LakeConfig;
+
+use configs::{init_logging, Opts};

mod configs;

@@ -15,60 +18,34 @@
/// We want to catch all *successful* transactions sent to one of the accounts from the list.
/// In the demo we'll just look for them and log them but it might and probably should be extended based on your needs.

-fn main() {
- // We use it to automatically search the for root certificates to perform HTTPS calls
- // (sending telemetry and downloading genesis)
- openssl_probe::init_ssl_cert_env_vars();
+#[tokio::main]
+async fn main() -> Result<(), tokio::io::Error> {
init_logging();

let opts: Opts = Opts::parse();

- let home_dir = opts.home_dir.unwrap_or_else(near_indexer::get_default_home);
+ let config: LakeConfig = opts.clone().into();

- match opts.subcmd {
- SubCommand::Run(args) => {
- // Create the Vec of AccountId from the provided ``--accounts`` to pass it to `listen_blocks`
- let watching_list = args
- .accounts
- .split(',')
- .map(|elem| {
- near_indexer::near_primitives::types::AccountId::from_str(elem)
- .expect("AccountId is invalid")
- })
- .collect();
-
- // Inform about indexer is being started and what accounts we're watching for
- eprintln!(
- "Starting indexer transaction watcher for accounts: \n {:#?}",
- &args.accounts
- );
-
- // Instantiate IndexerConfig with hardcoded parameters
- let indexer_config = near_indexer::IndexerConfig {
- home_dir,
- sync_mode: near_indexer::SyncModeEnum::FromInterruption,
- await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::WaitForFullSync,
- };
+ let (_, stream) = near_lake_framework::streamer(config);

- // Boilerplate code to start the indexer itself
- let sys = actix::System::new();
- sys.block_on(async move {
- eprintln!("Actix");
- let indexer = near_indexer::Indexer::new(indexer_config);
- let stream = indexer.streamer();
- actix::spawn(listen_blocks(stream, watching_list));
- });
- sys.run().unwrap();
- }
- SubCommand::Init(config) => near_indexer::indexer_init_configs(&home_dir, config.into()),
- }
+ let watching_list = opts
+ .accounts
+ .split(',')
+ .map(|elem| {
+ near_indexer_primitives::types::AccountId::from_str(elem).expect("AccountId is invalid")
+ })
+ .collect();
+
+ listen_blocks(stream, watching_list).await;
+
+ Ok(())
}

/// The main listener function the will be reading the stream of blocks `StreamerMessage`
/// and perform necessary checks
async fn listen_blocks(
- mut stream: mpsc::Receiver<near_indexer::StreamerMessage>,
- watching_list: Vec<near_indexer::near_primitives::types::AccountId>,
+ mut stream: mpsc::Receiver<near_indexer_primitives::StreamerMessage>,
+ watching_list: Vec<near_indexer_primitives::types::AccountId>,
) {
eprintln!("listen_blocks");
// This will be a map of correspondence between transactions and receipts
@@ -120,7 +97,7 @@
&execution_outcome.receipt.receiver_id,
execution_outcome.execution_outcome.outcome.status
);
- if let near_indexer::near_primitives::views::ReceiptEnumView::Action {
+ if let near_indexer_primitives::views::ReceiptEnumView::Action {
signer_id,
..
} = &execution_outcome.receipt.receipt
@@ -128,19 +105,20 @@
eprintln!("{}", signer_id);
}

- if let near_indexer::near_primitives::views::ReceiptEnumView::Action {
- actions,
- ..
+ if let near_indexer_primitives::views::ReceiptEnumView::Action {
+ actions, ..
} = execution_outcome.receipt.receipt
{
for action in actions.iter() {
- if let near_indexer::near_primitives::views::ActionView::FunctionCall {
+ if let near_indexer_primitives::views::ActionView::FunctionCall {
args,
..
} = action
{
if let Ok(decoded_args) = base64::decode(args) {
- if let Ok(args_json) = serde_json::from_slice::<serde_json::Value>(&decoded_args) {
+ if let Ok(args_json) =
+ serde_json::from_slice::<serde_json::Value>(&decoded_args)
+ {
eprintln!("{:#?}", args_json);
}
}
@@ -156,8 +134,8 @@
}

fn is_tx_receiver_watched(
- tx: &near_indexer::IndexerTransactionWithOutcome,
- watching_list: &[near_indexer::near_primitives::types::AccountId],
+ tx: &near_indexer_primitives::IndexerTransactionWithOutcome,
+ watching_list: &[near_indexer_primitives::types::AccountId],
) -> bool {
watching_list.contains(&tx.transaction.receiver_id)
}