tokio/runtime/
builder.rs

1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5    blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
9use crate::util::rand::{RngSeed, RngSeedGenerator};
10
11use crate::runtime::blocking::BlockingPool;
12use crate::runtime::scheduler::CurrentThread;
13use std::fmt;
14use std::io;
15use std::thread::ThreadId;
16use std::time::Duration;
17
18/// Builds Tokio Runtime with custom configuration values.
19///
20/// Methods can be chained in order to set the configuration values. The
21/// Runtime is constructed by calling [`build`].
22///
23/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
24/// or [`Builder::new_current_thread`].
25///
26/// See function level documentation for details on the various configuration
27/// settings.
28///
29/// [`build`]: method@Self::build
30/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
31/// [`Builder::new_current_thread`]: method@Self::new_current_thread
32///
33/// # Examples
34///
35/// ```
36/// # #[cfg(not(target_family = "wasm"))]
37/// # {
38/// use tokio::runtime::Builder;
39///
40/// fn main() {
41///     // build runtime
42///     let runtime = Builder::new_multi_thread()
43///         .worker_threads(4)
44///         .thread_name("my-custom-name")
45///         .thread_stack_size(3 * 1024 * 1024)
46///         .build()
47///         .unwrap();
48///
49///     // use runtime ...
50/// }
51/// # }
52/// ```
53pub struct Builder {
54    /// Runtime type
55    kind: Kind,
56
57    /// Whether or not to enable the I/O driver
58    enable_io: bool,
59    nevents: usize,
60
61    /// Whether or not to enable the time driver
62    enable_time: bool,
63
64    /// Whether or not the clock should start paused.
65    start_paused: bool,
66
67    /// The number of worker threads, used by Runtime.
68    ///
69    /// Only used when not using the current-thread executor.
70    worker_threads: Option<usize>,
71
72    /// Cap on thread usage.
73    max_blocking_threads: usize,
74
75    /// Name fn used for threads spawned by the runtime.
76    pub(super) thread_name: ThreadNameFn,
77
78    /// Stack size used for threads spawned by the runtime.
79    pub(super) thread_stack_size: Option<usize>,
80
81    /// Callback to run after each thread starts.
82    pub(super) after_start: Option<Callback>,
83
84    /// To run before each worker thread stops
85    pub(super) before_stop: Option<Callback>,
86
87    /// To run before each worker thread is parked.
88    pub(super) before_park: Option<Callback>,
89
90    /// To run after each thread is unparked.
91    pub(super) after_unpark: Option<Callback>,
92
93    /// To run before each task is spawned.
94    pub(super) before_spawn: Option<TaskCallback>,
95
96    /// To run before each poll
97    #[cfg(tokio_unstable)]
98    pub(super) before_poll: Option<TaskCallback>,
99
100    /// To run after each poll
101    #[cfg(tokio_unstable)]
102    pub(super) after_poll: Option<TaskCallback>,
103
104    /// To run after each task is terminated.
105    pub(super) after_termination: Option<TaskCallback>,
106
107    /// Customizable keep alive timeout for `BlockingPool`
108    pub(super) keep_alive: Option<Duration>,
109
110    /// How many ticks before pulling a task from the global/remote queue?
111    ///
112    /// When `None`, the value is unspecified and behavior details are left to
113    /// the scheduler. Each scheduler flavor could choose to either pick its own
114    /// default value or use some other strategy to decide when to poll from the
115    /// global queue. For example, the multi-threaded scheduler uses a
116    /// self-tuning strategy based on mean task poll times.
117    pub(super) global_queue_interval: Option<u32>,
118
119    /// How many ticks before yielding to the driver for timer and I/O events?
120    pub(super) event_interval: u32,
121
122    /// When true, the multi-threade scheduler LIFO slot should not be used.
123    ///
124    /// This option should only be exposed as unstable.
125    pub(super) disable_lifo_slot: bool,
126
127    /// Specify a random number generator seed to provide deterministic results
128    pub(super) seed_generator: RngSeedGenerator,
129
130    /// When true, enables task poll count histogram instrumentation.
131    pub(super) metrics_poll_count_histogram_enable: bool,
132
133    /// Configures the task poll count histogram
134    pub(super) metrics_poll_count_histogram: HistogramBuilder,
135
136    #[cfg(tokio_unstable)]
137    pub(super) unhandled_panic: UnhandledPanic,
138
139    timer_flavor: TimerFlavor,
140}
141
142cfg_unstable! {
143    /// How the runtime should respond to unhandled panics.
144    ///
145    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
146    /// to configure the runtime behavior when a spawned task panics.
147    ///
148    /// See [`Builder::unhandled_panic`] for more details.
149    #[derive(Debug, Clone)]
150    #[non_exhaustive]
151    pub enum UnhandledPanic {
152        /// The runtime should ignore panics on spawned tasks.
153        ///
154        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
155        /// tasks continue running normally.
156        ///
157        /// This is the default behavior.
158        ///
159        /// # Examples
160        ///
161        /// ```
162        /// # #[cfg(not(target_family = "wasm"))]
163        /// # {
164        /// use tokio::runtime::{self, UnhandledPanic};
165        ///
166        /// # pub fn main() {
167        /// let rt = runtime::Builder::new_current_thread()
168        ///     .unhandled_panic(UnhandledPanic::Ignore)
169        ///     .build()
170        ///     .unwrap();
171        ///
172        /// let task1 = rt.spawn(async { panic!("boom"); });
173        /// let task2 = rt.spawn(async {
174        ///     // This task completes normally
175        ///     "done"
176        /// });
177        ///
178        /// rt.block_on(async {
179        ///     // The panic on the first task is forwarded to the `JoinHandle`
180        ///     assert!(task1.await.is_err());
181        ///
182        ///     // The second task completes normally
183        ///     assert!(task2.await.is_ok());
184        /// })
185        /// # }
186        /// # }
187        /// ```
188        ///
189        /// [`JoinHandle`]: struct@crate::task::JoinHandle
190        Ignore,
191
192        /// The runtime should immediately shutdown if a spawned task panics.
193        ///
194        /// The runtime will immediately shutdown even if the panicked task's
195        /// [`JoinHandle`] is still available. All further spawned tasks will be
196        /// immediately dropped and call to [`Runtime::block_on`] will panic.
197        ///
198        /// # Examples
199        ///
200        /// ```should_panic
201        /// use tokio::runtime::{self, UnhandledPanic};
202        ///
203        /// # pub fn main() {
204        /// let rt = runtime::Builder::new_current_thread()
205        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
206        ///     .build()
207        ///     .unwrap();
208        ///
209        /// rt.spawn(async { panic!("boom"); });
210        /// rt.spawn(async {
211        ///     // This task never completes.
212        /// });
213        ///
214        /// rt.block_on(async {
215        ///     // Do some work
216        /// # loop { tokio::task::yield_now().await; }
217        /// })
218        /// # }
219        /// ```
220        ///
221        /// [`JoinHandle`]: struct@crate::task::JoinHandle
222        ShutdownRuntime,
223    }
224}
225
226pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
227
228#[derive(Clone, Copy)]
229pub(crate) enum Kind {
230    CurrentThread,
231    #[cfg(feature = "rt-multi-thread")]
232    MultiThread,
233}
234
235impl Builder {
236    /// Returns a new builder with the current thread scheduler selected.
237    ///
238    /// Configuration methods can be chained on the return value.
239    ///
240    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
241    /// [`LocalSet`].
242    ///
243    /// [`LocalSet`]: crate::task::LocalSet
244    pub fn new_current_thread() -> Builder {
245        #[cfg(loom)]
246        const EVENT_INTERVAL: u32 = 4;
247        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
248        #[cfg(not(loom))]
249        const EVENT_INTERVAL: u32 = 61;
250
251        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
252    }
253
254    /// Returns a new builder with the multi thread scheduler selected.
255    ///
256    /// Configuration methods can be chained on the return value.
257    #[cfg(feature = "rt-multi-thread")]
258    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
259    pub fn new_multi_thread() -> Builder {
260        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
261        Builder::new(Kind::MultiThread, 61)
262    }
263
264    /// Returns a new runtime builder initialized with default configuration
265    /// values.
266    ///
267    /// Configuration methods can be chained on the return value.
268    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
269        Builder {
270            kind,
271
272            // I/O defaults to "off"
273            enable_io: false,
274            nevents: 1024,
275
276            // Time defaults to "off"
277            enable_time: false,
278
279            // The clock starts not-paused
280            start_paused: false,
281
282            // Read from environment variable first in multi-threaded mode.
283            // Default to lazy auto-detection (one thread per CPU core)
284            worker_threads: None,
285
286            max_blocking_threads: 512,
287
288            // Default thread name
289            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
290
291            // Do not set a stack size by default
292            thread_stack_size: None,
293
294            // No worker thread callbacks
295            after_start: None,
296            before_stop: None,
297            before_park: None,
298            after_unpark: None,
299
300            before_spawn: None,
301            after_termination: None,
302
303            #[cfg(tokio_unstable)]
304            before_poll: None,
305            #[cfg(tokio_unstable)]
306            after_poll: None,
307
308            keep_alive: None,
309
310            // Defaults for these values depend on the scheduler kind, so we get them
311            // as parameters.
312            global_queue_interval: None,
313            event_interval,
314
315            seed_generator: RngSeedGenerator::new(RngSeed::new()),
316
317            #[cfg(tokio_unstable)]
318            unhandled_panic: UnhandledPanic::Ignore,
319
320            metrics_poll_count_histogram_enable: false,
321
322            metrics_poll_count_histogram: HistogramBuilder::default(),
323
324            disable_lifo_slot: false,
325
326            timer_flavor: TimerFlavor::Traditional,
327        }
328    }
329
330    /// Enables both I/O and time drivers.
331    ///
332    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
333    /// individually. If additional components are added to Tokio in the future,
334    /// `enable_all` will include these future components.
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// # #[cfg(not(target_family = "wasm"))]
340    /// # {
341    /// use tokio::runtime;
342    ///
343    /// let rt = runtime::Builder::new_multi_thread()
344    ///     .enable_all()
345    ///     .build()
346    ///     .unwrap();
347    /// # }
348    /// ```
349    pub fn enable_all(&mut self) -> &mut Self {
350        #[cfg(any(
351            feature = "net",
352            all(unix, feature = "process"),
353            all(unix, feature = "signal")
354        ))]
355        self.enable_io();
356
357        #[cfg(all(
358            tokio_unstable,
359            feature = "io-uring",
360            feature = "rt",
361            feature = "fs",
362            target_os = "linux",
363        ))]
364        self.enable_io_uring();
365
366        #[cfg(feature = "time")]
367        self.enable_time();
368
369        self
370    }
371
372    /// Enables the alternative timer implementation, which is disabled by default.
373    ///
374    /// The alternative timer implementation is an unstable feature that may
375    /// provide better performance on multi-threaded runtimes with a large number
376    /// of worker threads.
377    ///
378    /// This option only applies to multi-threaded runtimes. Attempting to use
379    /// this option with any other runtime type will have no effect.
380    ///
381    /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
382    ///
383    /// # Examples
384    ///
385    /// ```
386    /// # #[cfg(not(target_family = "wasm"))]
387    /// # {
388    /// use tokio::runtime;
389    ///
390    /// let rt = runtime::Builder::new_multi_thread()
391    ///   .enable_alt_timer()
392    ///   .build()
393    ///   .unwrap();
394    /// # }
395    /// ```
396    #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
397    #[cfg_attr(
398        docsrs,
399        doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
400    )]
401    pub fn enable_alt_timer(&mut self) -> &mut Self {
402        self.enable_time();
403        self.timer_flavor = TimerFlavor::Alternative;
404        self
405    }
406
407    /// Sets the number of worker threads the `Runtime` will use.
408    ///
409    /// This can be any number above 0 though it is advised to keep this value
410    /// on the smaller side.
411    ///
412    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
413    ///
414    /// # Default
415    ///
416    /// The default value is the number of cores available to the system.
417    ///
418    /// When using the `current_thread` runtime this method has no effect.
419    ///
420    /// # Examples
421    ///
422    /// ## Multi threaded runtime with 4 threads
423    ///
424    /// ```
425    /// # #[cfg(not(target_family = "wasm"))]
426    /// # {
427    /// use tokio::runtime;
428    ///
429    /// // This will spawn a work-stealing runtime with 4 worker threads.
430    /// let rt = runtime::Builder::new_multi_thread()
431    ///     .worker_threads(4)
432    ///     .build()
433    ///     .unwrap();
434    ///
435    /// rt.spawn(async move {});
436    /// # }
437    /// ```
438    ///
439    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
440    ///
441    /// ```
442    /// use tokio::runtime;
443    ///
444    /// // Create a runtime that _must_ be driven from a call
445    /// // to `Runtime::block_on`.
446    /// let rt = runtime::Builder::new_current_thread()
447    ///     .build()
448    ///     .unwrap();
449    ///
450    /// // This will run the runtime and future on the current thread
451    /// rt.block_on(async move {});
452    /// ```
453    ///
454    /// # Panics
455    ///
456    /// This will panic if `val` is not larger than `0`.
457    #[track_caller]
458    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
459        assert!(val > 0, "Worker threads cannot be set to 0");
460        self.worker_threads = Some(val);
461        self
462    }
463
464    /// Specifies the limit for additional threads spawned by the Runtime.
465    ///
466    /// These threads are used for blocking operations like tasks spawned
467    /// through [`spawn_blocking`], this includes but is not limited to:
468    /// - [`fs`] operations
469    /// - dns resolution through [`ToSocketAddrs`]
470    /// - writing to [`Stdout`] or [`Stderr`]
471    /// - reading from [`Stdin`]
472    ///
473    /// Unlike the [`worker_threads`], they are not always active and will exit
474    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
475    ///
476    /// It's recommended to not set this limit too low in order to avoid hanging on operations
477    /// requiring [`spawn_blocking`].
478    ///
479    /// The default value is 512.
480    ///
481    /// # Queue Behavior
482    ///
483    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
484    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
485    /// method has not been reached, a new thread will be spawned. If no idle thread is available
486    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
487    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
488    /// it could potentially grow unbounded.
489    ///
490    /// # Panics
491    ///
492    /// This will panic if `val` is not larger than `0`.
493    ///
494    /// # Upgrading from 0.x
495    ///
496    /// In old versions `max_threads` limited both blocking and worker threads, but the
497    /// current `max_blocking_threads` does not include async worker threads in the count.
498    ///
499    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
500    /// [`fs`]: mod@crate::fs
501    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
502    /// [`Stdout`]: struct@crate::io::Stdout
503    /// [`Stdin`]: struct@crate::io::Stdin
504    /// [`Stderr`]: struct@crate::io::Stderr
505    /// [`worker_threads`]: Self::worker_threads
506    /// [`thread_keep_alive`]: Self::thread_keep_alive
507    #[track_caller]
508    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
509    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
510        assert!(val > 0, "Max blocking threads cannot be set to 0");
511        self.max_blocking_threads = val;
512        self
513    }
514
515    /// Sets name of threads spawned by the `Runtime`'s thread pool.
516    ///
517    /// The default name is "tokio-runtime-worker".
518    ///
519    /// # Examples
520    ///
521    /// ```
522    /// # #[cfg(not(target_family = "wasm"))]
523    /// # {
524    /// # use tokio::runtime;
525    ///
526    /// # pub fn main() {
527    /// let rt = runtime::Builder::new_multi_thread()
528    ///     .thread_name("my-pool")
529    ///     .build();
530    /// # }
531    /// # }
532    /// ```
533    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
534        let val = val.into();
535        self.thread_name = std::sync::Arc::new(move || val.clone());
536        self
537    }
538
539    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
540    ///
541    /// The default name fn is `|| "tokio-runtime-worker".into()`.
542    ///
543    /// # Examples
544    ///
545    /// ```
546    /// # #[cfg(not(target_family = "wasm"))]
547    /// # {
548    /// # use tokio::runtime;
549    /// # use std::sync::atomic::{AtomicUsize, Ordering};
550    /// # pub fn main() {
551    /// let rt = runtime::Builder::new_multi_thread()
552    ///     .thread_name_fn(|| {
553    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
554    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
555    ///        format!("my-pool-{}", id)
556    ///     })
557    ///     .build();
558    /// # }
559    /// # }
560    /// ```
561    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
562    where
563        F: Fn() -> String + Send + Sync + 'static,
564    {
565        self.thread_name = std::sync::Arc::new(f);
566        self
567    }
568
569    /// Sets the stack size (in bytes) for worker threads.
570    ///
571    /// The actual stack size may be greater than this value if the platform
572    /// specifies minimal stack size.
573    ///
574    /// The default stack size for spawned threads is 2 MiB, though this
575    /// particular stack size is subject to change in the future.
576    ///
577    /// # Examples
578    ///
579    /// ```
580    /// # #[cfg(not(target_family = "wasm"))]
581    /// # {
582    /// # use tokio::runtime;
583    ///
584    /// # pub fn main() {
585    /// let rt = runtime::Builder::new_multi_thread()
586    ///     .thread_stack_size(32 * 1024)
587    ///     .build();
588    /// # }
589    /// # }
590    /// ```
591    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
592        self.thread_stack_size = Some(val);
593        self
594    }
595
596    /// Executes function `f` after each thread is started but before it starts
597    /// doing work.
598    ///
599    /// This is intended for bookkeeping and monitoring use cases.
600    ///
601    /// # Examples
602    ///
603    /// ```
604    /// # #[cfg(not(target_family = "wasm"))]
605    /// # {
606    /// # use tokio::runtime;
607    /// # pub fn main() {
608    /// let runtime = runtime::Builder::new_multi_thread()
609    ///     .on_thread_start(|| {
610    ///         println!("thread started");
611    ///     })
612    ///     .build();
613    /// # }
614    /// # }
615    /// ```
616    #[cfg(not(loom))]
617    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
618    where
619        F: Fn() + Send + Sync + 'static,
620    {
621        self.after_start = Some(std::sync::Arc::new(f));
622        self
623    }
624
625    /// Executes function `f` before each thread stops.
626    ///
627    /// This is intended for bookkeeping and monitoring use cases.
628    ///
629    /// # Examples
630    ///
631    /// ```
632    /// # #[cfg(not(target_family = "wasm"))]
633    /// {
634    /// # use tokio::runtime;
635    /// # pub fn main() {
636    /// let runtime = runtime::Builder::new_multi_thread()
637    ///     .on_thread_stop(|| {
638    ///         println!("thread stopping");
639    ///     })
640    ///     .build();
641    /// # }
642    /// # }
643    /// ```
644    #[cfg(not(loom))]
645    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
646    where
647        F: Fn() + Send + Sync + 'static,
648    {
649        self.before_stop = Some(std::sync::Arc::new(f));
650        self
651    }
652
653    /// Executes function `f` just before a thread is parked (goes idle).
654    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
655    /// can be called, and may result in this thread being unparked immediately.
656    ///
657    /// This can be used to start work only when the executor is idle, or for bookkeeping
658    /// and monitoring purposes.
659    ///
660    /// Note: There can only be one park callback for a runtime; calling this function
661    /// more than once replaces the last callback defined, rather than adding to it.
662    ///
663    /// # Examples
664    ///
665    /// ## Multithreaded executor
666    /// ```
667    /// # #[cfg(not(target_family = "wasm"))]
668    /// # {
669    /// # use std::sync::Arc;
670    /// # use std::sync::atomic::{AtomicBool, Ordering};
671    /// # use tokio::runtime;
672    /// # use tokio::sync::Barrier;
673    /// # pub fn main() {
674    /// let once = AtomicBool::new(true);
675    /// let barrier = Arc::new(Barrier::new(2));
676    ///
677    /// let runtime = runtime::Builder::new_multi_thread()
678    ///     .worker_threads(1)
679    ///     .on_thread_park({
680    ///         let barrier = barrier.clone();
681    ///         move || {
682    ///             let barrier = barrier.clone();
683    ///             if once.swap(false, Ordering::Relaxed) {
684    ///                 tokio::spawn(async move { barrier.wait().await; });
685    ///            }
686    ///         }
687    ///     })
688    ///     .build()
689    ///     .unwrap();
690    ///
691    /// runtime.block_on(async {
692    ///    barrier.wait().await;
693    /// })
694    /// # }
695    /// # }
696    /// ```
697    /// ## Current thread executor
698    /// ```
699    /// # use std::sync::Arc;
700    /// # use std::sync::atomic::{AtomicBool, Ordering};
701    /// # use tokio::runtime;
702    /// # use tokio::sync::Barrier;
703    /// # pub fn main() {
704    /// let once = AtomicBool::new(true);
705    /// let barrier = Arc::new(Barrier::new(2));
706    ///
707    /// let runtime = runtime::Builder::new_current_thread()
708    ///     .on_thread_park({
709    ///         let barrier = barrier.clone();
710    ///         move || {
711    ///             let barrier = barrier.clone();
712    ///             if once.swap(false, Ordering::Relaxed) {
713    ///                 tokio::spawn(async move { barrier.wait().await; });
714    ///            }
715    ///         }
716    ///     })
717    ///     .build()
718    ///     .unwrap();
719    ///
720    /// runtime.block_on(async {
721    ///    barrier.wait().await;
722    /// })
723    /// # }
724    /// ```
725    #[cfg(not(loom))]
726    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
727    where
728        F: Fn() + Send + Sync + 'static,
729    {
730        self.before_park = Some(std::sync::Arc::new(f));
731        self
732    }
733
734    /// Executes function `f` just after a thread unparks (starts executing tasks).
735    ///
736    /// This is intended for bookkeeping and monitoring use cases; note that work
737    /// in this callback will increase latencies when the application has allowed one or
738    /// more runtime threads to go idle.
739    ///
740    /// Note: There can only be one unpark callback for a runtime; calling this function
741    /// more than once replaces the last callback defined, rather than adding to it.
742    ///
743    /// # Examples
744    ///
745    /// ```
746    /// # #[cfg(not(target_family = "wasm"))]
747    /// # {
748    /// # use tokio::runtime;
749    /// # pub fn main() {
750    /// let runtime = runtime::Builder::new_multi_thread()
751    ///     .on_thread_unpark(|| {
752    ///         println!("thread unparking");
753    ///     })
754    ///     .build();
755    ///
756    /// runtime.unwrap().block_on(async {
757    ///    tokio::task::yield_now().await;
758    ///    println!("Hello from Tokio!");
759    /// })
760    /// # }
761    /// # }
762    /// ```
763    #[cfg(not(loom))]
764    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
765    where
766        F: Fn() + Send + Sync + 'static,
767    {
768        self.after_unpark = Some(std::sync::Arc::new(f));
769        self
770    }
771
772    /// Executes function `f` just before a task is spawned.
773    ///
774    /// `f` is called within the Tokio context, so functions like
775    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
776    /// invoked immediately.
777    ///
778    /// This can be used for bookkeeping or monitoring purposes.
779    ///
780    /// Note: There can only be one spawn callback for a runtime; calling this function more
781    /// than once replaces the last callback defined, rather than adding to it.
782    ///
783    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
784    ///
785    /// **Note**: This is an [unstable API][unstable]. The public API of this type
786    /// may break in 1.x releases. See [the documentation on unstable
787    /// features][unstable] for details.
788    ///
789    /// [unstable]: crate#unstable-features
790    ///
791    /// # Examples
792    ///
793    /// ```
794    /// # use tokio::runtime;
795    /// # pub fn main() {
796    /// let runtime = runtime::Builder::new_current_thread()
797    ///     .on_task_spawn(|_| {
798    ///         println!("spawning task");
799    ///     })
800    ///     .build()
801    ///     .unwrap();
802    ///
803    /// runtime.block_on(async {
804    ///     tokio::task::spawn(std::future::ready(()));
805    ///
806    ///     for _ in 0..64 {
807    ///         tokio::task::yield_now().await;
808    ///     }
809    /// })
810    /// # }
811    /// ```
812    #[cfg(all(not(loom), tokio_unstable))]
813    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
814    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
815    where
816        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
817    {
818        self.before_spawn = Some(std::sync::Arc::new(f));
819        self
820    }
821
822    /// Executes function `f` just before a task is polled
823    ///
824    /// `f` is called within the Tokio context, so functions like
825    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
826    /// invoked immediately.
827    ///
828    /// **Note**: This is an [unstable API][unstable]. The public API of this type
829    /// may break in 1.x releases. See [the documentation on unstable
830    /// features][unstable] for details.
831    ///
832    /// [unstable]: crate#unstable-features
833    ///
834    /// # Examples
835    ///
836    /// ```
837    /// # #[cfg(not(target_family = "wasm"))]
838    /// # {
839    /// # use std::sync::{atomic::AtomicUsize, Arc};
840    /// # use tokio::task::yield_now;
841    /// # pub fn main() {
842    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
843    /// let poll_start = poll_start_counter.clone();
844    /// let rt = tokio::runtime::Builder::new_multi_thread()
845    ///     .enable_all()
846    ///     .on_before_task_poll(move |meta| {
847    ///         println!("task {} is about to be polled", meta.id())
848    ///     })
849    ///     .build()
850    ///     .unwrap();
851    /// let task = rt.spawn(async {
852    ///     yield_now().await;
853    /// });
854    /// let _ = rt.block_on(task);
855    ///
856    /// # }
857    /// # }
858    /// ```
859    #[cfg(tokio_unstable)]
860    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
861    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
862    where
863        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
864    {
865        self.before_poll = Some(std::sync::Arc::new(f));
866        self
867    }
868
869    /// Executes function `f` just after a task is polled
870    ///
871    /// `f` is called within the Tokio context, so functions like
872    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
873    /// invoked immediately.
874    ///
875    /// **Note**: This is an [unstable API][unstable]. The public API of this type
876    /// may break in 1.x releases. See [the documentation on unstable
877    /// features][unstable] for details.
878    ///
879    /// [unstable]: crate#unstable-features
880    ///
881    /// # Examples
882    ///
883    /// ```
884    /// # #[cfg(not(target_family = "wasm"))]
885    /// # {
886    /// # use std::sync::{atomic::AtomicUsize, Arc};
887    /// # use tokio::task::yield_now;
888    /// # pub fn main() {
889    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
890    /// let poll_stop = poll_stop_counter.clone();
891    /// let rt = tokio::runtime::Builder::new_multi_thread()
892    ///     .enable_all()
893    ///     .on_after_task_poll(move |meta| {
894    ///         println!("task {} completed polling", meta.id());
895    ///     })
896    ///     .build()
897    ///     .unwrap();
898    /// let task = rt.spawn(async {
899    ///     yield_now().await;
900    /// });
901    /// let _ = rt.block_on(task);
902    ///
903    /// # }
904    /// # }
905    /// ```
906    #[cfg(tokio_unstable)]
907    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
908    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
909    where
910        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
911    {
912        self.after_poll = Some(std::sync::Arc::new(f));
913        self
914    }
915
916    /// Executes function `f` just after a task is terminated.
917    ///
918    /// `f` is called within the Tokio context, so functions like
919    /// [`tokio::spawn`](crate::spawn) can be called.
920    ///
921    /// This can be used for bookkeeping or monitoring purposes.
922    ///
923    /// Note: There can only be one task termination callback for a runtime; calling this
924    /// function more than once replaces the last callback defined, rather than adding to it.
925    ///
926    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
927    ///
928    /// **Note**: This is an [unstable API][unstable]. The public API of this type
929    /// may break in 1.x releases. See [the documentation on unstable
930    /// features][unstable] for details.
931    ///
932    /// [unstable]: crate#unstable-features
933    ///
934    /// # Examples
935    ///
936    /// ```
937    /// # use tokio::runtime;
938    /// # pub fn main() {
939    /// let runtime = runtime::Builder::new_current_thread()
940    ///     .on_task_terminate(|_| {
941    ///         println!("killing task");
942    ///     })
943    ///     .build()
944    ///     .unwrap();
945    ///
946    /// runtime.block_on(async {
947    ///     tokio::task::spawn(std::future::ready(()));
948    ///
949    ///     for _ in 0..64 {
950    ///         tokio::task::yield_now().await;
951    ///     }
952    /// })
953    /// # }
954    /// ```
955    #[cfg(all(not(loom), tokio_unstable))]
956    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
957    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
958    where
959        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
960    {
961        self.after_termination = Some(std::sync::Arc::new(f));
962        self
963    }
964
965    /// Creates the configured `Runtime`.
966    ///
967    /// The returned `Runtime` instance is ready to spawn tasks.
968    ///
969    /// # Examples
970    ///
971    /// ```
972    /// # #[cfg(not(target_family = "wasm"))]
973    /// # {
974    /// use tokio::runtime::Builder;
975    ///
976    /// let rt  = Builder::new_multi_thread().build().unwrap();
977    ///
978    /// rt.block_on(async {
979    ///     println!("Hello from the Tokio runtime");
980    /// });
981    /// # }
982    /// ```
983    pub fn build(&mut self) -> io::Result<Runtime> {
984        match &self.kind {
985            Kind::CurrentThread => self.build_current_thread_runtime(),
986            #[cfg(feature = "rt-multi-thread")]
987            Kind::MultiThread => self.build_threaded_runtime(),
988        }
989    }
990
991    /// Creates the configured [`LocalRuntime`].
992    ///
993    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
994    ///
995    /// # Panics
996    ///
997    /// This will panic if the runtime is configured with [`new_multi_thread()`].
998    ///
999    /// [`new_multi_thread()`]: Builder::new_multi_thread
1000    ///
1001    /// # Examples
1002    ///
1003    /// ```
1004    /// use tokio::runtime::{Builder, LocalOptions};
1005    ///
1006    /// let rt = Builder::new_current_thread()
1007    ///     .build_local(LocalOptions::default())
1008    ///     .unwrap();
1009    ///
1010    /// rt.spawn_local(async {
1011    ///     println!("Hello from the Tokio runtime");
1012    /// });
1013    /// ```
1014    #[allow(unused_variables, unreachable_patterns)]
1015    #[cfg(tokio_unstable)]
1016    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1017    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1018        match &self.kind {
1019            Kind::CurrentThread => self.build_current_thread_local_runtime(),
1020            #[cfg(feature = "rt-multi-thread")]
1021            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1022        }
1023    }
1024
1025    fn get_cfg(&self) -> driver::Cfg {
1026        driver::Cfg {
1027            enable_pause_time: match self.kind {
1028                Kind::CurrentThread => true,
1029                #[cfg(feature = "rt-multi-thread")]
1030                Kind::MultiThread => false,
1031            },
1032            enable_io: self.enable_io,
1033            enable_time: self.enable_time,
1034            start_paused: self.start_paused,
1035            nevents: self.nevents,
1036            timer_flavor: self.timer_flavor,
1037        }
1038    }
1039
1040    /// Sets a custom timeout for a thread in the blocking pool.
1041    ///
1042    /// By default, the timeout for a thread is set to 10 seconds. This can
1043    /// be overridden using `.thread_keep_alive()`.
1044    ///
1045    /// # Example
1046    ///
1047    /// ```
1048    /// # #[cfg(not(target_family = "wasm"))]
1049    /// # {
1050    /// # use tokio::runtime;
1051    /// # use std::time::Duration;
1052    /// # pub fn main() {
1053    /// let rt = runtime::Builder::new_multi_thread()
1054    ///     .thread_keep_alive(Duration::from_millis(100))
1055    ///     .build();
1056    /// # }
1057    /// # }
1058    /// ```
1059    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1060        self.keep_alive = Some(duration);
1061        self
1062    }
1063
1064    /// Sets the number of scheduler ticks after which the scheduler will poll the global
1065    /// task queue.
1066    ///
1067    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1068    ///
1069    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1070    /// [the module documentation] for the default behavior of the multi-thread scheduler.
1071    ///
1072    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1073    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1074    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1075    /// getting started on new work, especially if tasks frequently yield rather than complete
1076    /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1077    /// tasks from the local queue will be executed only if the global queue is empty.
1078    /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1079    /// tasks quickly complete polling.
1080    ///
1081    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1082    ///
1083    /// # Panics
1084    ///
1085    /// This function will panic if 0 is passed as an argument.
1086    ///
1087    /// # Examples
1088    ///
1089    /// ```
1090    /// # #[cfg(not(target_family = "wasm"))]
1091    /// # {
1092    /// # use tokio::runtime;
1093    /// # pub fn main() {
1094    /// let rt = runtime::Builder::new_multi_thread()
1095    ///     .global_queue_interval(31)
1096    ///     .build();
1097    /// # }
1098    /// # }
1099    /// ```
1100    #[track_caller]
1101    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1102        assert!(val > 0, "global_queue_interval must be greater than 0");
1103        self.global_queue_interval = Some(val);
1104        self
1105    }
1106
1107    /// Sets the number of scheduler ticks after which the scheduler will poll for
1108    /// external events (timers, I/O, and so on).
1109    ///
1110    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1111    ///
1112    /// By default, the event interval is `61` for all scheduler types.
1113    ///
1114    /// Setting the event interval determines the effective "priority" of delivering
1115    /// these external events (which may wake up additional tasks), compared to
1116    /// executing tasks that are currently ready to run. A smaller value is useful
1117    /// when tasks frequently spend a long time in polling, or frequently yield,
1118    /// which can result in overly long delays picking up I/O events. Conversely,
1119    /// picking up new events requires extra synchronization and syscall overhead,
1120    /// so if tasks generally complete their polling quickly, a higher event interval
1121    /// will minimize that overhead while still keeping the scheduler responsive to
1122    /// events.
1123    ///
1124    /// # Examples
1125    ///
1126    /// ```
1127    /// # #[cfg(not(target_family = "wasm"))]
1128    /// # {
1129    /// # use tokio::runtime;
1130    /// # pub fn main() {
1131    /// let rt = runtime::Builder::new_multi_thread()
1132    ///     .event_interval(31)
1133    ///     .build();
1134    /// # }
1135    /// # }
1136    /// ```
1137    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1138        self.event_interval = val;
1139        self
1140    }
1141
1142    cfg_unstable! {
1143        /// Configure how the runtime responds to an unhandled panic on a
1144        /// spawned task.
1145        ///
1146        /// By default, an unhandled panic (i.e. a panic not caught by
1147        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1148        /// execution. The panic's error value is forwarded to the task's
1149        /// [`JoinHandle`] and all other spawned tasks continue running.
1150        ///
1151        /// The `unhandled_panic` option enables configuring this behavior.
1152        ///
1153        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1154        ///   spawned tasks have no impact on the runtime's execution.
1155        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1156        ///   shutdown immediately when a spawned task panics even if that
1157        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1158        ///   will immediately terminate and further calls to
1159        ///   [`Runtime::block_on`] will panic.
1160        ///
1161        /// # Panics
1162        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1163        /// on a runtime other than the current thread runtime.
1164        ///
1165        /// # Unstable
1166        ///
1167        /// This option is currently unstable and its implementation is
1168        /// incomplete. The API may change or be removed in the future. See
1169        /// issue [tokio-rs/tokio#4516] for more details.
1170        ///
1171        /// # Examples
1172        ///
1173        /// The following demonstrates a runtime configured to shutdown on
1174        /// panic. The first spawned task panics and results in the runtime
1175        /// shutting down. The second spawned task never has a chance to
1176        /// execute. The call to `block_on` will panic due to the runtime being
1177        /// forcibly shutdown.
1178        ///
1179        /// ```should_panic
1180        /// use tokio::runtime::{self, UnhandledPanic};
1181        ///
1182        /// # pub fn main() {
1183        /// let rt = runtime::Builder::new_current_thread()
1184        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1185        ///     .build()
1186        ///     .unwrap();
1187        ///
1188        /// rt.spawn(async { panic!("boom"); });
1189        /// rt.spawn(async {
1190        ///     // This task never completes.
1191        /// });
1192        ///
1193        /// rt.block_on(async {
1194        ///     // Do some work
1195        /// # loop { tokio::task::yield_now().await; }
1196        /// })
1197        /// # }
1198        /// ```
1199        ///
1200        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1201        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1202        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1203            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1204                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1205            }
1206
1207            self.unhandled_panic = behavior;
1208            self
1209        }
1210
1211        /// Disables the LIFO task scheduler heuristic.
1212        ///
1213        /// The multi-threaded scheduler includes a heuristic for optimizing
1214        /// message-passing patterns. This heuristic results in the **last**
1215        /// scheduled task being polled first.
1216        ///
1217        /// To implement this heuristic, each worker thread has a slot which
1218        /// holds the task that should be polled next. However, this slot cannot
1219        /// be stolen by other worker threads, which can result in lower total
1220        /// throughput when tasks tend to have longer poll times.
1221        ///
1222        /// This configuration option will disable this heuristic resulting in
1223        /// all scheduled tasks being pushed into the worker-local queue, which
1224        /// is stealable.
1225        ///
1226        /// Consider trying this option when the task "scheduled" time is high
1227        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1228        /// collect this data.
1229        ///
1230        /// # Unstable
1231        ///
1232        /// This configuration option is considered a workaround for the LIFO
1233        /// slot not being stealable. When the slot becomes stealable, we will
1234        /// revisit whether or not this option is necessary. See
1235        /// issue [tokio-rs/tokio#4941].
1236        ///
1237        /// # Examples
1238        ///
1239        /// ```
1240        /// # #[cfg(not(target_family = "wasm"))]
1241        /// # {
1242        /// use tokio::runtime;
1243        ///
1244        /// let rt = runtime::Builder::new_multi_thread()
1245        ///     .disable_lifo_slot()
1246        ///     .build()
1247        ///     .unwrap();
1248        /// # }
1249        /// ```
1250        ///
1251        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1252        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1253        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1254            self.disable_lifo_slot = true;
1255            self
1256        }
1257
1258        /// Specifies the random number generation seed to use within all
1259        /// threads associated with the runtime being built.
1260        ///
1261        /// This option is intended to make certain parts of the runtime
1262        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1263        /// [`tokio::select!`] it will ensure that the order that branches are
1264        /// polled is deterministic.
1265        ///
1266        /// In addition to the code specifying `rng_seed` and interacting with
1267        /// the runtime, the internals of Tokio and the Rust compiler may affect
1268        /// the sequences of random numbers. In order to ensure repeatable
1269        /// results, the version of Tokio, the versions of all other
1270        /// dependencies that interact with Tokio, and the Rust compiler version
1271        /// should also all remain constant.
1272        ///
1273        /// # Examples
1274        ///
1275        /// ```
1276        /// # use tokio::runtime::{self, RngSeed};
1277        /// # pub fn main() {
1278        /// let seed = RngSeed::from_bytes(b"place your seed here");
1279        /// let rt = runtime::Builder::new_current_thread()
1280        ///     .rng_seed(seed)
1281        ///     .build();
1282        /// # }
1283        /// ```
1284        ///
1285        /// [`tokio::select!`]: crate::select
1286        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1287            self.seed_generator = RngSeedGenerator::new(seed);
1288            self
1289        }
1290    }
1291
1292    cfg_unstable_metrics! {
1293        /// Enables tracking the distribution of task poll times.
1294        ///
1295        /// Task poll times are not instrumented by default as doing so requires
1296        /// calling [`Instant::now()`] twice per task poll, which could add
1297        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1298        /// metrics data.
1299        ///
1300        /// The histogram uses fixed bucket sizes. In other words, the histogram
1301        /// buckets are not dynamic based on input values. Use the
1302        /// `metrics_poll_time_histogram` builder methods to configure the
1303        /// histogram details.
1304        ///
1305        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1306        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1307        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1308        /// to select [`LogHistogram`] instead.
1309        ///
1310        /// # Examples
1311        ///
1312        /// ```
1313        /// # #[cfg(not(target_family = "wasm"))]
1314        /// # {
1315        /// use tokio::runtime;
1316        ///
1317        /// let rt = runtime::Builder::new_multi_thread()
1318        ///     .enable_metrics_poll_time_histogram()
1319        ///     .build()
1320        ///     .unwrap();
1321        /// # // Test default values here
1322        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1323        /// # let m = rt.handle().metrics();
1324        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1325        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1326        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1327        /// # }
1328        /// ```
1329        ///
1330        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1331        /// [`Instant::now()`]: std::time::Instant::now
1332        /// [`LogHistogram`]: crate::runtime::LogHistogram
1333        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1334        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1335            self.metrics_poll_count_histogram_enable = true;
1336            self
1337        }
1338
1339        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1340        ///
1341        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1342        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1343        #[doc(hidden)]
1344        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1345            self.enable_metrics_poll_time_histogram()
1346        }
1347
1348        /// Sets the histogram scale for tracking the distribution of task poll
1349        /// times.
1350        ///
1351        /// Tracking the distribution of task poll times can be done using a
1352        /// linear or log scale. When using linear scale, each histogram bucket
1353        /// will represent the same range of poll times. When using log scale,
1354        /// each histogram bucket will cover a range twice as big as the
1355        /// previous bucket.
1356        ///
1357        /// **Default:** linear scale.
1358        ///
1359        /// # Examples
1360        ///
1361        /// ```
1362        /// # #[cfg(not(target_family = "wasm"))]
1363        /// # {
1364        /// use tokio::runtime::{self, HistogramScale};
1365        ///
1366        /// # #[allow(deprecated)]
1367        /// let rt = runtime::Builder::new_multi_thread()
1368        ///     .enable_metrics_poll_time_histogram()
1369        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1370        ///     .build()
1371        ///     .unwrap();
1372        /// # }
1373        /// ```
1374        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1375        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1376            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1377            self
1378        }
1379
1380        /// Configure the histogram for tracking poll times
1381        ///
1382        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1383        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1384        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1385        ///
1386        /// # Examples
1387        /// Configure a [`LogHistogram`] with [default configuration]:
1388        /// ```
1389        /// # #[cfg(not(target_family = "wasm"))]
1390        /// # {
1391        /// use tokio::runtime;
1392        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1393        ///
1394        /// let rt = runtime::Builder::new_multi_thread()
1395        ///     .enable_metrics_poll_time_histogram()
1396        ///     .metrics_poll_time_histogram_configuration(
1397        ///         HistogramConfiguration::log(LogHistogram::default())
1398        ///     )
1399        ///     .build()
1400        ///     .unwrap();
1401        /// # }
1402        /// ```
1403        ///
1404        /// Configure a linear histogram with 100 buckets, each 10μs wide
1405        /// ```
1406        /// # #[cfg(not(target_family = "wasm"))]
1407        /// # {
1408        /// use tokio::runtime;
1409        /// use std::time::Duration;
1410        /// use tokio::runtime::HistogramConfiguration;
1411        ///
1412        /// let rt = runtime::Builder::new_multi_thread()
1413        ///     .enable_metrics_poll_time_histogram()
1414        ///     .metrics_poll_time_histogram_configuration(
1415        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1416        ///     )
1417        ///     .build()
1418        ///     .unwrap();
1419        /// # }
1420        /// ```
1421        ///
1422        /// Configure a [`LogHistogram`] with the following settings:
1423        /// - Measure times from 100ns to 120s
1424        /// - Max error of 0.1
1425        /// - No more than 1024 buckets
1426        /// ```
1427        /// # #[cfg(not(target_family = "wasm"))]
1428        /// # {
1429        /// use std::time::Duration;
1430        /// use tokio::runtime;
1431        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1432        ///
1433        /// let rt = runtime::Builder::new_multi_thread()
1434        ///     .enable_metrics_poll_time_histogram()
1435        ///     .metrics_poll_time_histogram_configuration(
1436        ///         HistogramConfiguration::log(LogHistogram::builder()
1437        ///             .max_value(Duration::from_secs(120))
1438        ///             .min_value(Duration::from_nanos(100))
1439        ///             .max_error(0.1)
1440        ///             .max_buckets(1024)
1441        ///             .expect("configuration uses 488 buckets")
1442        ///         )
1443        ///     )
1444        ///     .build()
1445        ///     .unwrap();
1446        /// # }
1447        /// ```
1448        ///
1449        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1450        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1451        /// where each bucket is twice the size of the previous bucket.
1452        /// ```rust
1453        /// use std::time::Duration;
1454        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1455        /// let rt = tokio::runtime::Builder::new_current_thread()
1456        ///     .enable_all()
1457        ///     .enable_metrics_poll_time_histogram()
1458        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1459        ///         LogHistogram::builder()
1460        ///             .min_value(Duration::from_micros(20))
1461        ///             .max_value(Duration::from_millis(4))
1462        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1463        ///             .precision_exact(0)
1464        ///             .max_buckets(10)
1465        ///             .unwrap(),
1466        ///     ))
1467        ///     .build()
1468        ///     .unwrap();
1469        /// ```
1470        ///
1471        /// [`LogHistogram`]: crate::runtime::LogHistogram
1472        /// [default configuration]: crate::runtime::LogHistogramBuilder
1473        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1474        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1475            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1476            self
1477        }
1478
1479        /// Sets the histogram resolution for tracking the distribution of task
1480        /// poll times.
1481        ///
1482        /// The resolution is the histogram's first bucket's range. When using a
1483        /// linear histogram scale, each bucket will cover the same range. When
1484        /// using a log scale, each bucket will cover a range twice as big as
1485        /// the previous bucket. In the log case, the resolution represents the
1486        /// smallest bucket range.
1487        ///
1488        /// Note that, when using log scale, the resolution is rounded up to the
1489        /// nearest power of 2 in nanoseconds.
1490        ///
1491        /// **Default:** 100 microseconds.
1492        ///
1493        /// # Examples
1494        ///
1495        /// ```
1496        /// # #[cfg(not(target_family = "wasm"))]
1497        /// # {
1498        /// use tokio::runtime;
1499        /// use std::time::Duration;
1500        ///
1501        /// # #[allow(deprecated)]
1502        /// let rt = runtime::Builder::new_multi_thread()
1503        ///     .enable_metrics_poll_time_histogram()
1504        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1505        ///     .build()
1506        ///     .unwrap();
1507        /// # }
1508        /// ```
1509        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1510        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1511            assert!(resolution > Duration::from_secs(0));
1512            // Sanity check the argument and also make the cast below safe.
1513            assert!(resolution <= Duration::from_secs(1));
1514
1515            let resolution = resolution.as_nanos() as u64;
1516
1517            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1518            self
1519        }
1520
1521        /// Sets the number of buckets for the histogram tracking the
1522        /// distribution of task poll times.
1523        ///
1524        /// The last bucket tracks all greater values that fall out of other
1525        /// ranges. So, configuring the histogram using a linear scale,
1526        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1527        /// polls that take more than 450ms to complete.
1528        ///
1529        /// **Default:** 10
1530        ///
1531        /// # Examples
1532        ///
1533        /// ```
1534        /// # #[cfg(not(target_family = "wasm"))]
1535        /// # {
1536        /// use tokio::runtime;
1537        ///
1538        /// # #[allow(deprecated)]
1539        /// let rt = runtime::Builder::new_multi_thread()
1540        ///     .enable_metrics_poll_time_histogram()
1541        ///     .metrics_poll_count_histogram_buckets(15)
1542        ///     .build()
1543        ///     .unwrap();
1544        /// # }
1545        /// ```
1546        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1547        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1548            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1549            self
1550        }
1551    }
1552
1553    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1554        use crate::runtime::runtime::Scheduler;
1555
1556        let (scheduler, handle, blocking_pool) =
1557            self.build_current_thread_runtime_components(None)?;
1558
1559        Ok(Runtime::from_parts(
1560            Scheduler::CurrentThread(scheduler),
1561            handle,
1562            blocking_pool,
1563        ))
1564    }
1565
1566    #[cfg(tokio_unstable)]
1567    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1568        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1569
1570        let tid = std::thread::current().id();
1571
1572        let (scheduler, handle, blocking_pool) =
1573            self.build_current_thread_runtime_components(Some(tid))?;
1574
1575        Ok(LocalRuntime::from_parts(
1576            LocalRuntimeScheduler::CurrentThread(scheduler),
1577            handle,
1578            blocking_pool,
1579        ))
1580    }
1581
1582    fn build_current_thread_runtime_components(
1583        &mut self,
1584        local_tid: Option<ThreadId>,
1585    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1586        use crate::runtime::scheduler;
1587        use crate::runtime::Config;
1588
1589        let mut cfg = self.get_cfg();
1590        cfg.timer_flavor = TimerFlavor::Traditional;
1591        let (driver, driver_handle) = driver::Driver::new(cfg)?;
1592
1593        // Blocking pool
1594        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1595        let blocking_spawner = blocking_pool.spawner().clone();
1596
1597        // Generate a rng seed for this runtime.
1598        let seed_generator_1 = self.seed_generator.next_generator();
1599        let seed_generator_2 = self.seed_generator.next_generator();
1600
1601        // And now put a single-threaded scheduler on top of the timer. When
1602        // there are no futures ready to do something, it'll let the timer or
1603        // the reactor to generate some new stimuli for the futures to continue
1604        // in their life.
1605        let (scheduler, handle) = CurrentThread::new(
1606            driver,
1607            driver_handle,
1608            blocking_spawner,
1609            seed_generator_2,
1610            Config {
1611                before_park: self.before_park.clone(),
1612                after_unpark: self.after_unpark.clone(),
1613                before_spawn: self.before_spawn.clone(),
1614                #[cfg(tokio_unstable)]
1615                before_poll: self.before_poll.clone(),
1616                #[cfg(tokio_unstable)]
1617                after_poll: self.after_poll.clone(),
1618                after_termination: self.after_termination.clone(),
1619                global_queue_interval: self.global_queue_interval,
1620                event_interval: self.event_interval,
1621                #[cfg(tokio_unstable)]
1622                unhandled_panic: self.unhandled_panic.clone(),
1623                disable_lifo_slot: self.disable_lifo_slot,
1624                seed_generator: seed_generator_1,
1625                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1626            },
1627            local_tid,
1628        );
1629
1630        let handle = Handle {
1631            inner: scheduler::Handle::CurrentThread(handle),
1632        };
1633
1634        Ok((scheduler, handle, blocking_pool))
1635    }
1636
1637    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1638        if self.metrics_poll_count_histogram_enable {
1639            Some(self.metrics_poll_count_histogram.clone())
1640        } else {
1641            None
1642        }
1643    }
1644}
1645
1646cfg_io_driver! {
1647    impl Builder {
1648        /// Enables the I/O driver.
1649        ///
1650        /// Doing this enables using net, process, signal, and some I/O types on
1651        /// the runtime.
1652        ///
1653        /// # Examples
1654        ///
1655        /// ```
1656        /// use tokio::runtime;
1657        ///
1658        /// let rt = runtime::Builder::new_multi_thread()
1659        ///     .enable_io()
1660        ///     .build()
1661        ///     .unwrap();
1662        /// ```
1663        pub fn enable_io(&mut self) -> &mut Self {
1664            self.enable_io = true;
1665            self
1666        }
1667
1668        /// Enables the I/O driver and configures the max number of events to be
1669        /// processed per tick.
1670        ///
1671        /// # Examples
1672        ///
1673        /// ```
1674        /// use tokio::runtime;
1675        ///
1676        /// let rt = runtime::Builder::new_current_thread()
1677        ///     .enable_io()
1678        ///     .max_io_events_per_tick(1024)
1679        ///     .build()
1680        ///     .unwrap();
1681        /// ```
1682        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1683            self.nevents = capacity;
1684            self
1685        }
1686    }
1687}
1688
1689cfg_time! {
1690    impl Builder {
1691        /// Enables the time driver.
1692        ///
1693        /// Doing this enables using `tokio::time` on the runtime.
1694        ///
1695        /// # Examples
1696        ///
1697        /// ```
1698        /// # #[cfg(not(target_family = "wasm"))]
1699        /// # {
1700        /// use tokio::runtime;
1701        ///
1702        /// let rt = runtime::Builder::new_multi_thread()
1703        ///     .enable_time()
1704        ///     .build()
1705        ///     .unwrap();
1706        /// # }
1707        /// ```
1708        pub fn enable_time(&mut self) -> &mut Self {
1709            self.enable_time = true;
1710            self
1711        }
1712    }
1713}
1714
1715cfg_io_uring! {
1716    impl Builder {
1717        /// Enables the tokio's io_uring driver.
1718        ///
1719        /// Doing this enables using io_uring operations on the runtime.
1720        ///
1721        /// # Examples
1722        ///
1723        /// ```
1724        /// use tokio::runtime;
1725        ///
1726        /// let rt = runtime::Builder::new_multi_thread()
1727        ///     .enable_io_uring()
1728        ///     .build()
1729        ///     .unwrap();
1730        /// ```
1731        #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1732        pub fn enable_io_uring(&mut self) -> &mut Self {
1733            // Currently, the uring flag is equivalent to `enable_io`.
1734            self.enable_io = true;
1735            self
1736        }
1737    }
1738}
1739
1740cfg_test_util! {
1741    impl Builder {
1742        /// Controls if the runtime's clock starts paused or advancing.
1743        ///
1744        /// Pausing time requires the current-thread runtime; construction of
1745        /// the runtime will panic otherwise.
1746        ///
1747        /// # Examples
1748        ///
1749        /// ```
1750        /// use tokio::runtime;
1751        ///
1752        /// let rt = runtime::Builder::new_current_thread()
1753        ///     .enable_time()
1754        ///     .start_paused(true)
1755        ///     .build()
1756        ///     .unwrap();
1757        /// ```
1758        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1759            self.start_paused = start_paused;
1760            self
1761        }
1762    }
1763}
1764
1765cfg_rt_multi_thread! {
1766    impl Builder {
1767        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1768            use crate::loom::sys::num_cpus;
1769            use crate::runtime::{Config, runtime::Scheduler};
1770            use crate::runtime::scheduler::{self, MultiThread};
1771
1772            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1773
1774            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1775
1776            // Create the blocking pool
1777            let blocking_pool =
1778                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1779            let blocking_spawner = blocking_pool.spawner().clone();
1780
1781            // Generate a rng seed for this runtime.
1782            let seed_generator_1 = self.seed_generator.next_generator();
1783            let seed_generator_2 = self.seed_generator.next_generator();
1784
1785            let (scheduler, handle, launch) = MultiThread::new(
1786                worker_threads,
1787                driver,
1788                driver_handle,
1789                blocking_spawner,
1790                seed_generator_2,
1791                Config {
1792                    before_park: self.before_park.clone(),
1793                    after_unpark: self.after_unpark.clone(),
1794                    before_spawn: self.before_spawn.clone(),
1795                    #[cfg(tokio_unstable)]
1796                    before_poll: self.before_poll.clone(),
1797                    #[cfg(tokio_unstable)]
1798                    after_poll: self.after_poll.clone(),
1799                    after_termination: self.after_termination.clone(),
1800                    global_queue_interval: self.global_queue_interval,
1801                    event_interval: self.event_interval,
1802                    #[cfg(tokio_unstable)]
1803                    unhandled_panic: self.unhandled_panic.clone(),
1804                    disable_lifo_slot: self.disable_lifo_slot,
1805                    seed_generator: seed_generator_1,
1806                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1807                },
1808                self.timer_flavor,
1809            );
1810
1811            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1812
1813            // Spawn the thread pool workers
1814            let _enter = handle.enter();
1815            launch.launch();
1816
1817            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1818        }
1819    }
1820}
1821
1822impl fmt::Debug for Builder {
1823    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1824        fmt.debug_struct("Builder")
1825            .field("worker_threads", &self.worker_threads)
1826            .field("max_blocking_threads", &self.max_blocking_threads)
1827            .field(
1828                "thread_name",
1829                &"<dyn Fn() -> String + Send + Sync + 'static>",
1830            )
1831            .field("thread_stack_size", &self.thread_stack_size)
1832            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1833            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1834            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1835            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1836            .finish()
1837    }
1838}