libp2p/builder/phase/
websocket.rs

1use super::*;
2use crate::SwarmBuilder;
3#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
4use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
5use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
6#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
7use libp2p_core::Transport;
8#[cfg(any(
9    all(not(target_arch = "wasm32"), feature = "websocket"),
10    feature = "relay"
11))]
12use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
13#[cfg(any(
14    all(not(target_arch = "wasm32"), feature = "websocket"),
15    feature = "relay"
16))]
17use libp2p_identity::PeerId;
18use std::marker::PhantomData;
19
20pub struct WebsocketPhase<T> {
21    pub(crate) transport: T,
22}
23
24macro_rules! impl_websocket_builder {
25    ($providerKebabCase:literal, $providerPascalCase:ty, $dnsTcp:expr, $websocketStream:ty) => {
26        /// Adds a websocket client transport.
27        ///
28        /// Note that both `security_upgrade` and `multiplexer_upgrade` take function pointers,
29        /// i.e. they take the function themselves (without the invocation via `()`), not the
30        /// result of the function invocation. See example below.
31        ///
32        /// ``` rust
33        /// # use libp2p::SwarmBuilder;
34        /// # use std::error::Error;
35        /// # async fn build_swarm() -> Result<(), Box<dyn Error>> {
36        /// let swarm = SwarmBuilder::with_new_identity()
37        ///     .with_tokio()
38        ///     .with_websocket(
39        ///         (libp2p_tls::Config::new, libp2p_noise::Config::new),
40        ///         libp2p_yamux::Config::default,
41        ///     )
42        ///     .await?
43        /// # ;
44        /// # Ok(())
45        /// # }
46        /// ```
47        #[cfg(all(not(target_arch = "wasm32"), feature = $providerKebabCase, feature = "websocket"))]
48        impl<T> SwarmBuilder<$providerPascalCase, WebsocketPhase<T>> {
49            pub async fn with_websocket<
50                SecUpgrade,
51                SecStream,
52                SecError,
53                MuxUpgrade,
54                MuxStream,
55                MuxError,
56            >(
57                self,
58                security_upgrade: SecUpgrade,
59                multiplexer_upgrade: MuxUpgrade,
60            ) -> Result<
61                SwarmBuilder<
62                    $providerPascalCase,
63                    RelayPhase<impl AuthenticatedMultiplexedTransport>,
64                >,
65                WebsocketError<SecUpgrade::Error>,
66            >
67
68            where
69                T: AuthenticatedMultiplexedTransport,
70
71                SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
72                SecError: std::error::Error + Send + Sync + 'static,
73                SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
74                SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
75                <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
76                <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
77                <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
78                <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
79
80                MuxStream: StreamMuxer + Send + 'static,
81                MuxStream::Substream: Send + 'static,
82                MuxStream::Error: Send + Sync + 'static,
83                MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
84                MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
85                <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
86                <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
87                MuxError: std::error::Error + Send + Sync + 'static,
88                <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
89                <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
90
91            {
92                let security_upgrade = security_upgrade.into_security_upgrade(&self.keypair)
93                    .map_err(WebsocketErrorInner::SecurityUpgrade)?;
94                let websocket_transport = libp2p_websocket::WsConfig::new(
95                    $dnsTcp.await.map_err(WebsocketErrorInner::Dns)?,
96                )
97                    .upgrade(libp2p_core::upgrade::Version::V1Lazy)
98                    .authenticate(security_upgrade)
99                    .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
100                    .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
101
102                Ok(SwarmBuilder {
103                    keypair: self.keypair,
104                    phantom: PhantomData,
105                    phase: RelayPhase {
106                        transport: websocket_transport
107                            .or_transport(self.phase.transport)
108                            .map(|either, _| either.into_inner()),
109                    },
110                })
111            }
112        }
113    };
114}
115
116impl_websocket_builder!(
117    "async-std",
118    super::provider::AsyncStd,
119    libp2p_dns::async_std::Transport::system(libp2p_tcp::async_io::Transport::new(
120        libp2p_tcp::Config::default(),
121    )),
122    rw_stream_sink::RwStreamSink<
123        libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
124    >
125);
126impl_websocket_builder!(
127    "tokio",
128    super::provider::Tokio,
129    // Note this is an unnecessary await for Tokio Websocket (i.e. tokio dns) in order to be consistent
130    // with above AsyncStd construction.
131    futures::future::ready(libp2p_dns::tokio::Transport::system(
132        libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default())
133    )),
134    rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
135);
136
137impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
138    pub(crate) fn without_websocket(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
139        SwarmBuilder {
140            keypair: self.keypair,
141            phantom: PhantomData,
142            phase: RelayPhase {
143                transport: self.phase.transport,
144            },
145        }
146    }
147}
148
149// Shortcuts
150#[cfg(feature = "relay")]
151impl<T: AuthenticatedMultiplexedTransport, Provider> SwarmBuilder<Provider, WebsocketPhase<T>> {
152    /// See [`SwarmBuilder::with_relay_client`].
153    pub fn with_relay_client<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
154        self,
155        security_upgrade: SecUpgrade,
156        multiplexer_upgrade: MuxUpgrade,
157    ) -> Result<
158        SwarmBuilder<
159            Provider,
160            BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
161        >,
162        SecUpgrade::Error,
163        > where
164
165        SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
166        SecError: std::error::Error + Send + Sync + 'static,
167        SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
168        SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
169    <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
170    <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
171    <<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
172    <<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,
173
174        MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
175        MuxStream::Substream: Send + 'static,
176        MuxStream::Error: Send + Sync + 'static,
177        MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
178        MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
179    <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
180    <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
181        MuxError: std::error::Error + Send + Sync + 'static,
182    <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
183    <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
184    {
185        self.without_websocket()
186            .with_relay_client(security_upgrade, multiplexer_upgrade)
187    }
188}
189#[cfg(feature = "metrics")]
190impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
191    pub fn with_bandwidth_metrics(
192        self,
193        registry: &mut libp2p_metrics::Registry,
194    ) -> SwarmBuilder<
195        Provider,
196        BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
197    > {
198        self.without_websocket()
199            .without_relay()
200            .without_bandwidth_logging()
201            .with_bandwidth_metrics(registry)
202    }
203}
204impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
205    pub fn with_behaviour<B, R: TryIntoBehaviour<B>>(
206        self,
207        constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
208    ) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
209        self.without_websocket()
210            .without_relay()
211            .without_bandwidth_logging()
212            .with_behaviour(constructor)
213    }
214}
215
216#[derive(Debug, thiserror::Error)]
217#[error(transparent)]
218#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
219pub struct WebsocketError<Sec>(#[from] WebsocketErrorInner<Sec>);
220
221#[derive(Debug, thiserror::Error)]
222#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
223enum WebsocketErrorInner<Sec> {
224    #[error("SecurityUpgrade")]
225    SecurityUpgrade(Sec),
226    #[cfg(feature = "dns")]
227    #[error("Dns")]
228    Dns(#[from] std::io::Error),
229}