1use super::*;
2use crate::SwarmBuilder;
3#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
4use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
5use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
6#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
7use libp2p_core::Transport;
8#[cfg(any(
9 all(not(target_arch = "wasm32"), feature = "websocket"),
10 feature = "relay"
11))]
12use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
13#[cfg(any(
14 all(not(target_arch = "wasm32"), feature = "websocket"),
15 feature = "relay"
16))]
17use libp2p_identity::PeerId;
18use std::marker::PhantomData;
19
20pub struct WebsocketPhase<T> {
21 pub(crate) transport: T,
22}
23
24macro_rules! impl_websocket_builder {
25 ($providerKebabCase:literal, $providerPascalCase:ty, $dnsTcp:expr, $websocketStream:ty) => {
26 #[cfg(all(not(target_arch = "wasm32"), feature = $providerKebabCase, feature = "websocket"))]
48 impl<T> SwarmBuilder<$providerPascalCase, WebsocketPhase<T>> {
49 pub async fn with_websocket<
50 SecUpgrade,
51 SecStream,
52 SecError,
53 MuxUpgrade,
54 MuxStream,
55 MuxError,
56 >(
57 self,
58 security_upgrade: SecUpgrade,
59 multiplexer_upgrade: MuxUpgrade,
60 ) -> Result<
61 SwarmBuilder<
62 $providerPascalCase,
63 RelayPhase<impl AuthenticatedMultiplexedTransport>,
64 >,
65 WebsocketError<SecUpgrade::Error>,
66 >
67
68 where
69 T: AuthenticatedMultiplexedTransport,
70
71 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
72 SecError: std::error::Error + Send + Sync + 'static,
73 SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
74 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
75 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
76 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
77 <<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
78 <<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,
79
80 MuxStream: StreamMuxer + Send + 'static,
81 MuxStream::Substream: Send + 'static,
82 MuxStream::Error: Send + Sync + 'static,
83 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
84 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
85 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
86 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
87 MuxError: std::error::Error + Send + Sync + 'static,
88 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
89 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
90
91 {
92 let security_upgrade = security_upgrade.into_security_upgrade(&self.keypair)
93 .map_err(WebsocketErrorInner::SecurityUpgrade)?;
94 let websocket_transport = libp2p_websocket::WsConfig::new(
95 $dnsTcp.await.map_err(WebsocketErrorInner::Dns)?,
96 )
97 .upgrade(libp2p_core::upgrade::Version::V1Lazy)
98 .authenticate(security_upgrade)
99 .multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
100 .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
101
102 Ok(SwarmBuilder {
103 keypair: self.keypair,
104 phantom: PhantomData,
105 phase: RelayPhase {
106 transport: websocket_transport
107 .or_transport(self.phase.transport)
108 .map(|either, _| either.into_inner()),
109 },
110 })
111 }
112 }
113 };
114}
115
116impl_websocket_builder!(
117 "async-std",
118 super::provider::AsyncStd,
119 libp2p_dns::async_std::Transport::system(libp2p_tcp::async_io::Transport::new(
120 libp2p_tcp::Config::default(),
121 )),
122 rw_stream_sink::RwStreamSink<
123 libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
124 >
125);
126impl_websocket_builder!(
127 "tokio",
128 super::provider::Tokio,
129 futures::future::ready(libp2p_dns::tokio::Transport::system(
132 libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default())
133 )),
134 rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
135);
136
137impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
138 pub(crate) fn without_websocket(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
139 SwarmBuilder {
140 keypair: self.keypair,
141 phantom: PhantomData,
142 phase: RelayPhase {
143 transport: self.phase.transport,
144 },
145 }
146 }
147}
148
149#[cfg(feature = "relay")]
151impl<T: AuthenticatedMultiplexedTransport, Provider> SwarmBuilder<Provider, WebsocketPhase<T>> {
152 pub fn with_relay_client<SecUpgrade, SecStream, SecError, MuxUpgrade, MuxStream, MuxError>(
154 self,
155 security_upgrade: SecUpgrade,
156 multiplexer_upgrade: MuxUpgrade,
157 ) -> Result<
158 SwarmBuilder<
159 Provider,
160 BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
161 >,
162 SecUpgrade::Error,
163 > where
164
165 SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
166 SecError: std::error::Error + Send + Sync + 'static,
167 SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
168 SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
169 <SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
170 <SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
171 <<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
172 <<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,
173
174 MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
175 MuxStream::Substream: Send + 'static,
176 MuxStream::Error: Send + Sync + 'static,
177 MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
178 MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
179 <MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
180 <MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
181 MuxError: std::error::Error + Send + Sync + 'static,
182 <<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
183 <<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
184 {
185 self.without_websocket()
186 .with_relay_client(security_upgrade, multiplexer_upgrade)
187 }
188}
189#[cfg(feature = "metrics")]
190impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
191 pub fn with_bandwidth_metrics(
192 self,
193 registry: &mut libp2p_metrics::Registry,
194 ) -> SwarmBuilder<
195 Provider,
196 BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
197 > {
198 self.without_websocket()
199 .without_relay()
200 .without_bandwidth_logging()
201 .with_bandwidth_metrics(registry)
202 }
203}
204impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, WebsocketPhase<T>> {
205 pub fn with_behaviour<B, R: TryIntoBehaviour<B>>(
206 self,
207 constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
208 ) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
209 self.without_websocket()
210 .without_relay()
211 .without_bandwidth_logging()
212 .with_behaviour(constructor)
213 }
214}
215
216#[derive(Debug, thiserror::Error)]
217#[error(transparent)]
218#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
219pub struct WebsocketError<Sec>(#[from] WebsocketErrorInner<Sec>);
220
221#[derive(Debug, thiserror::Error)]
222#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
223enum WebsocketErrorInner<Sec> {
224 #[error("SecurityUpgrade")]
225 SecurityUpgrade(Sec),
226 #[cfg(feature = "dns")]
227 #[error("Dns")]
228 Dns(#[from] std::io::Error),
229}