tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5 blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
9use crate::util::rand::{RngSeed, RngSeedGenerator};
10
11use crate::runtime::blocking::BlockingPool;
12use crate::runtime::scheduler::CurrentThread;
13use std::fmt;
14use std::io;
15use std::thread::ThreadId;
16use std::time::Duration;
17
18/// Builds Tokio Runtime with custom configuration values.
19///
20/// Methods can be chained in order to set the configuration values. The
21/// Runtime is constructed by calling [`build`].
22///
23/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
24/// or [`Builder::new_current_thread`].
25///
26/// See function level documentation for details on the various configuration
27/// settings.
28///
29/// [`build`]: method@Self::build
30/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
31/// [`Builder::new_current_thread`]: method@Self::new_current_thread
32///
33/// # Examples
34///
35/// ```
36/// # #[cfg(not(target_family = "wasm"))]
37/// # {
38/// use tokio::runtime::Builder;
39///
40/// fn main() {
41/// // build runtime
42/// let runtime = Builder::new_multi_thread()
43/// .worker_threads(4)
44/// .thread_name("my-custom-name")
45/// .thread_stack_size(3 * 1024 * 1024)
46/// .build()
47/// .unwrap();
48///
49/// // use runtime ...
50/// }
51/// # }
52/// ```
53pub struct Builder {
54 /// Runtime type
55 kind: Kind,
56
57 /// Whether or not to enable the I/O driver
58 enable_io: bool,
59 nevents: usize,
60
61 /// Whether or not to enable the time driver
62 enable_time: bool,
63
64 /// Whether or not the clock should start paused.
65 start_paused: bool,
66
67 /// The number of worker threads, used by Runtime.
68 ///
69 /// Only used when not using the current-thread executor.
70 worker_threads: Option<usize>,
71
72 /// Cap on thread usage.
73 max_blocking_threads: usize,
74
75 /// Name fn used for threads spawned by the runtime.
76 pub(super) thread_name: ThreadNameFn,
77
78 /// Stack size used for threads spawned by the runtime.
79 pub(super) thread_stack_size: Option<usize>,
80
81 /// Callback to run after each thread starts.
82 pub(super) after_start: Option<Callback>,
83
84 /// To run before each worker thread stops
85 pub(super) before_stop: Option<Callback>,
86
87 /// To run before each worker thread is parked.
88 pub(super) before_park: Option<Callback>,
89
90 /// To run after each thread is unparked.
91 pub(super) after_unpark: Option<Callback>,
92
93 /// To run before each task is spawned.
94 pub(super) before_spawn: Option<TaskCallback>,
95
96 /// To run before each poll
97 #[cfg(tokio_unstable)]
98 pub(super) before_poll: Option<TaskCallback>,
99
100 /// To run after each poll
101 #[cfg(tokio_unstable)]
102 pub(super) after_poll: Option<TaskCallback>,
103
104 /// To run after each task is terminated.
105 pub(super) after_termination: Option<TaskCallback>,
106
107 /// Customizable keep alive timeout for `BlockingPool`
108 pub(super) keep_alive: Option<Duration>,
109
110 /// How many ticks before pulling a task from the global/remote queue?
111 ///
112 /// When `None`, the value is unspecified and behavior details are left to
113 /// the scheduler. Each scheduler flavor could choose to either pick its own
114 /// default value or use some other strategy to decide when to poll from the
115 /// global queue. For example, the multi-threaded scheduler uses a
116 /// self-tuning strategy based on mean task poll times.
117 pub(super) global_queue_interval: Option<u32>,
118
119 /// How many ticks before yielding to the driver for timer and I/O events?
120 pub(super) event_interval: u32,
121
122 /// When true, the multi-threade scheduler LIFO slot should not be used.
123 ///
124 /// This option should only be exposed as unstable.
125 pub(super) disable_lifo_slot: bool,
126
127 /// Specify a random number generator seed to provide deterministic results
128 pub(super) seed_generator: RngSeedGenerator,
129
130 /// When true, enables task poll count histogram instrumentation.
131 pub(super) metrics_poll_count_histogram_enable: bool,
132
133 /// Configures the task poll count histogram
134 pub(super) metrics_poll_count_histogram: HistogramBuilder,
135
136 #[cfg(tokio_unstable)]
137 pub(super) unhandled_panic: UnhandledPanic,
138
139 timer_flavor: TimerFlavor,
140}
141
142cfg_unstable! {
143 /// How the runtime should respond to unhandled panics.
144 ///
145 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
146 /// to configure the runtime behavior when a spawned task panics.
147 ///
148 /// See [`Builder::unhandled_panic`] for more details.
149 #[derive(Debug, Clone)]
150 #[non_exhaustive]
151 pub enum UnhandledPanic {
152 /// The runtime should ignore panics on spawned tasks.
153 ///
154 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
155 /// tasks continue running normally.
156 ///
157 /// This is the default behavior.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// # #[cfg(not(target_family = "wasm"))]
163 /// # {
164 /// use tokio::runtime::{self, UnhandledPanic};
165 ///
166 /// # pub fn main() {
167 /// let rt = runtime::Builder::new_current_thread()
168 /// .unhandled_panic(UnhandledPanic::Ignore)
169 /// .build()
170 /// .unwrap();
171 ///
172 /// let task1 = rt.spawn(async { panic!("boom"); });
173 /// let task2 = rt.spawn(async {
174 /// // This task completes normally
175 /// "done"
176 /// });
177 ///
178 /// rt.block_on(async {
179 /// // The panic on the first task is forwarded to the `JoinHandle`
180 /// assert!(task1.await.is_err());
181 ///
182 /// // The second task completes normally
183 /// assert!(task2.await.is_ok());
184 /// })
185 /// # }
186 /// # }
187 /// ```
188 ///
189 /// [`JoinHandle`]: struct@crate::task::JoinHandle
190 Ignore,
191
192 /// The runtime should immediately shutdown if a spawned task panics.
193 ///
194 /// The runtime will immediately shutdown even if the panicked task's
195 /// [`JoinHandle`] is still available. All further spawned tasks will be
196 /// immediately dropped and call to [`Runtime::block_on`] will panic.
197 ///
198 /// # Examples
199 ///
200 /// ```should_panic
201 /// use tokio::runtime::{self, UnhandledPanic};
202 ///
203 /// # pub fn main() {
204 /// let rt = runtime::Builder::new_current_thread()
205 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
206 /// .build()
207 /// .unwrap();
208 ///
209 /// rt.spawn(async { panic!("boom"); });
210 /// rt.spawn(async {
211 /// // This task never completes.
212 /// });
213 ///
214 /// rt.block_on(async {
215 /// // Do some work
216 /// # loop { tokio::task::yield_now().await; }
217 /// })
218 /// # }
219 /// ```
220 ///
221 /// [`JoinHandle`]: struct@crate::task::JoinHandle
222 ShutdownRuntime,
223 }
224}
225
226pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
227
228#[derive(Clone, Copy)]
229pub(crate) enum Kind {
230 CurrentThread,
231 #[cfg(feature = "rt-multi-thread")]
232 MultiThread,
233}
234
235impl Builder {
236 /// Returns a new builder with the current thread scheduler selected.
237 ///
238 /// Configuration methods can be chained on the return value.
239 ///
240 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
241 /// [`LocalSet`].
242 ///
243 /// [`LocalSet`]: crate::task::LocalSet
244 pub fn new_current_thread() -> Builder {
245 #[cfg(loom)]
246 const EVENT_INTERVAL: u32 = 4;
247 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
248 #[cfg(not(loom))]
249 const EVENT_INTERVAL: u32 = 61;
250
251 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
252 }
253
254 /// Returns a new builder with the multi thread scheduler selected.
255 ///
256 /// Configuration methods can be chained on the return value.
257 #[cfg(feature = "rt-multi-thread")]
258 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
259 pub fn new_multi_thread() -> Builder {
260 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
261 Builder::new(Kind::MultiThread, 61)
262 }
263
264 /// Returns a new runtime builder initialized with default configuration
265 /// values.
266 ///
267 /// Configuration methods can be chained on the return value.
268 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
269 Builder {
270 kind,
271
272 // I/O defaults to "off"
273 enable_io: false,
274 nevents: 1024,
275
276 // Time defaults to "off"
277 enable_time: false,
278
279 // The clock starts not-paused
280 start_paused: false,
281
282 // Read from environment variable first in multi-threaded mode.
283 // Default to lazy auto-detection (one thread per CPU core)
284 worker_threads: None,
285
286 max_blocking_threads: 512,
287
288 // Default thread name
289 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
290
291 // Do not set a stack size by default
292 thread_stack_size: None,
293
294 // No worker thread callbacks
295 after_start: None,
296 before_stop: None,
297 before_park: None,
298 after_unpark: None,
299
300 before_spawn: None,
301 after_termination: None,
302
303 #[cfg(tokio_unstable)]
304 before_poll: None,
305 #[cfg(tokio_unstable)]
306 after_poll: None,
307
308 keep_alive: None,
309
310 // Defaults for these values depend on the scheduler kind, so we get them
311 // as parameters.
312 global_queue_interval: None,
313 event_interval,
314
315 seed_generator: RngSeedGenerator::new(RngSeed::new()),
316
317 #[cfg(tokio_unstable)]
318 unhandled_panic: UnhandledPanic::Ignore,
319
320 metrics_poll_count_histogram_enable: false,
321
322 metrics_poll_count_histogram: HistogramBuilder::default(),
323
324 disable_lifo_slot: false,
325
326 timer_flavor: TimerFlavor::Traditional,
327 }
328 }
329
330 /// Enables both I/O and time drivers.
331 ///
332 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
333 /// individually. If additional components are added to Tokio in the future,
334 /// `enable_all` will include these future components.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// # #[cfg(not(target_family = "wasm"))]
340 /// # {
341 /// use tokio::runtime;
342 ///
343 /// let rt = runtime::Builder::new_multi_thread()
344 /// .enable_all()
345 /// .build()
346 /// .unwrap();
347 /// # }
348 /// ```
349 pub fn enable_all(&mut self) -> &mut Self {
350 #[cfg(any(
351 feature = "net",
352 all(unix, feature = "process"),
353 all(unix, feature = "signal")
354 ))]
355 self.enable_io();
356
357 #[cfg(all(
358 tokio_unstable,
359 feature = "io-uring",
360 feature = "rt",
361 feature = "fs",
362 target_os = "linux",
363 ))]
364 self.enable_io_uring();
365
366 #[cfg(feature = "time")]
367 self.enable_time();
368
369 self
370 }
371
372 /// Enables the alternative timer implementation, which is disabled by default.
373 ///
374 /// The alternative timer implementation is an unstable feature that may
375 /// provide better performance on multi-threaded runtimes with a large number
376 /// of worker threads.
377 ///
378 /// This option only applies to multi-threaded runtimes. Attempting to use
379 /// this option with any other runtime type will have no effect.
380 ///
381 /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
382 ///
383 /// # Examples
384 ///
385 /// ```
386 /// # #[cfg(not(target_family = "wasm"))]
387 /// # {
388 /// use tokio::runtime;
389 ///
390 /// let rt = runtime::Builder::new_multi_thread()
391 /// .enable_alt_timer()
392 /// .build()
393 /// .unwrap();
394 /// # }
395 /// ```
396 #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
397 #[cfg_attr(
398 docsrs,
399 doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
400 )]
401 pub fn enable_alt_timer(&mut self) -> &mut Self {
402 self.enable_time();
403 self.timer_flavor = TimerFlavor::Alternative;
404 self
405 }
406
407 /// Sets the number of worker threads the `Runtime` will use.
408 ///
409 /// This can be any number above 0 though it is advised to keep this value
410 /// on the smaller side.
411 ///
412 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
413 ///
414 /// # Default
415 ///
416 /// The default value is the number of cores available to the system.
417 ///
418 /// When using the `current_thread` runtime this method has no effect.
419 ///
420 /// # Examples
421 ///
422 /// ## Multi threaded runtime with 4 threads
423 ///
424 /// ```
425 /// # #[cfg(not(target_family = "wasm"))]
426 /// # {
427 /// use tokio::runtime;
428 ///
429 /// // This will spawn a work-stealing runtime with 4 worker threads.
430 /// let rt = runtime::Builder::new_multi_thread()
431 /// .worker_threads(4)
432 /// .build()
433 /// .unwrap();
434 ///
435 /// rt.spawn(async move {});
436 /// # }
437 /// ```
438 ///
439 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
440 ///
441 /// ```
442 /// use tokio::runtime;
443 ///
444 /// // Create a runtime that _must_ be driven from a call
445 /// // to `Runtime::block_on`.
446 /// let rt = runtime::Builder::new_current_thread()
447 /// .build()
448 /// .unwrap();
449 ///
450 /// // This will run the runtime and future on the current thread
451 /// rt.block_on(async move {});
452 /// ```
453 ///
454 /// # Panics
455 ///
456 /// This will panic if `val` is not larger than `0`.
457 #[track_caller]
458 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
459 assert!(val > 0, "Worker threads cannot be set to 0");
460 self.worker_threads = Some(val);
461 self
462 }
463
464 /// Specifies the limit for additional threads spawned by the Runtime.
465 ///
466 /// These threads are used for blocking operations like tasks spawned
467 /// through [`spawn_blocking`], this includes but is not limited to:
468 /// - [`fs`] operations
469 /// - dns resolution through [`ToSocketAddrs`]
470 /// - writing to [`Stdout`] or [`Stderr`]
471 /// - reading from [`Stdin`]
472 ///
473 /// Unlike the [`worker_threads`], they are not always active and will exit
474 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
475 ///
476 /// It's recommended to not set this limit too low in order to avoid hanging on operations
477 /// requiring [`spawn_blocking`].
478 ///
479 /// The default value is 512.
480 ///
481 /// # Queue Behavior
482 ///
483 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
484 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
485 /// method has not been reached, a new thread will be spawned. If no idle thread is available
486 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
487 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
488 /// it could potentially grow unbounded.
489 ///
490 /// # Panics
491 ///
492 /// This will panic if `val` is not larger than `0`.
493 ///
494 /// # Upgrading from 0.x
495 ///
496 /// In old versions `max_threads` limited both blocking and worker threads, but the
497 /// current `max_blocking_threads` does not include async worker threads in the count.
498 ///
499 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
500 /// [`fs`]: mod@crate::fs
501 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
502 /// [`Stdout`]: struct@crate::io::Stdout
503 /// [`Stdin`]: struct@crate::io::Stdin
504 /// [`Stderr`]: struct@crate::io::Stderr
505 /// [`worker_threads`]: Self::worker_threads
506 /// [`thread_keep_alive`]: Self::thread_keep_alive
507 #[track_caller]
508 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
509 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
510 assert!(val > 0, "Max blocking threads cannot be set to 0");
511 self.max_blocking_threads = val;
512 self
513 }
514
515 /// Sets name of threads spawned by the `Runtime`'s thread pool.
516 ///
517 /// The default name is "tokio-runtime-worker".
518 ///
519 /// # Examples
520 ///
521 /// ```
522 /// # #[cfg(not(target_family = "wasm"))]
523 /// # {
524 /// # use tokio::runtime;
525 ///
526 /// # pub fn main() {
527 /// let rt = runtime::Builder::new_multi_thread()
528 /// .thread_name("my-pool")
529 /// .build();
530 /// # }
531 /// # }
532 /// ```
533 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
534 let val = val.into();
535 self.thread_name = std::sync::Arc::new(move || val.clone());
536 self
537 }
538
539 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
540 ///
541 /// The default name fn is `|| "tokio-runtime-worker".into()`.
542 ///
543 /// # Examples
544 ///
545 /// ```
546 /// # #[cfg(not(target_family = "wasm"))]
547 /// # {
548 /// # use tokio::runtime;
549 /// # use std::sync::atomic::{AtomicUsize, Ordering};
550 /// # pub fn main() {
551 /// let rt = runtime::Builder::new_multi_thread()
552 /// .thread_name_fn(|| {
553 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
554 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
555 /// format!("my-pool-{}", id)
556 /// })
557 /// .build();
558 /// # }
559 /// # }
560 /// ```
561 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
562 where
563 F: Fn() -> String + Send + Sync + 'static,
564 {
565 self.thread_name = std::sync::Arc::new(f);
566 self
567 }
568
569 /// Sets the stack size (in bytes) for worker threads.
570 ///
571 /// The actual stack size may be greater than this value if the platform
572 /// specifies minimal stack size.
573 ///
574 /// The default stack size for spawned threads is 2 MiB, though this
575 /// particular stack size is subject to change in the future.
576 ///
577 /// # Examples
578 ///
579 /// ```
580 /// # #[cfg(not(target_family = "wasm"))]
581 /// # {
582 /// # use tokio::runtime;
583 ///
584 /// # pub fn main() {
585 /// let rt = runtime::Builder::new_multi_thread()
586 /// .thread_stack_size(32 * 1024)
587 /// .build();
588 /// # }
589 /// # }
590 /// ```
591 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
592 self.thread_stack_size = Some(val);
593 self
594 }
595
596 /// Executes function `f` after each thread is started but before it starts
597 /// doing work.
598 ///
599 /// This is intended for bookkeeping and monitoring use cases.
600 ///
601 /// # Examples
602 ///
603 /// ```
604 /// # #[cfg(not(target_family = "wasm"))]
605 /// # {
606 /// # use tokio::runtime;
607 /// # pub fn main() {
608 /// let runtime = runtime::Builder::new_multi_thread()
609 /// .on_thread_start(|| {
610 /// println!("thread started");
611 /// })
612 /// .build();
613 /// # }
614 /// # }
615 /// ```
616 #[cfg(not(loom))]
617 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
618 where
619 F: Fn() + Send + Sync + 'static,
620 {
621 self.after_start = Some(std::sync::Arc::new(f));
622 self
623 }
624
625 /// Executes function `f` before each thread stops.
626 ///
627 /// This is intended for bookkeeping and monitoring use cases.
628 ///
629 /// # Examples
630 ///
631 /// ```
632 /// # #[cfg(not(target_family = "wasm"))]
633 /// {
634 /// # use tokio::runtime;
635 /// # pub fn main() {
636 /// let runtime = runtime::Builder::new_multi_thread()
637 /// .on_thread_stop(|| {
638 /// println!("thread stopping");
639 /// })
640 /// .build();
641 /// # }
642 /// # }
643 /// ```
644 #[cfg(not(loom))]
645 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
646 where
647 F: Fn() + Send + Sync + 'static,
648 {
649 self.before_stop = Some(std::sync::Arc::new(f));
650 self
651 }
652
653 /// Executes function `f` just before a thread is parked (goes idle).
654 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
655 /// can be called, and may result in this thread being unparked immediately.
656 ///
657 /// This can be used to start work only when the executor is idle, or for bookkeeping
658 /// and monitoring purposes.
659 ///
660 /// Note: There can only be one park callback for a runtime; calling this function
661 /// more than once replaces the last callback defined, rather than adding to it.
662 ///
663 /// # Examples
664 ///
665 /// ## Multithreaded executor
666 /// ```
667 /// # #[cfg(not(target_family = "wasm"))]
668 /// # {
669 /// # use std::sync::Arc;
670 /// # use std::sync::atomic::{AtomicBool, Ordering};
671 /// # use tokio::runtime;
672 /// # use tokio::sync::Barrier;
673 /// # pub fn main() {
674 /// let once = AtomicBool::new(true);
675 /// let barrier = Arc::new(Barrier::new(2));
676 ///
677 /// let runtime = runtime::Builder::new_multi_thread()
678 /// .worker_threads(1)
679 /// .on_thread_park({
680 /// let barrier = barrier.clone();
681 /// move || {
682 /// let barrier = barrier.clone();
683 /// if once.swap(false, Ordering::Relaxed) {
684 /// tokio::spawn(async move { barrier.wait().await; });
685 /// }
686 /// }
687 /// })
688 /// .build()
689 /// .unwrap();
690 ///
691 /// runtime.block_on(async {
692 /// barrier.wait().await;
693 /// })
694 /// # }
695 /// # }
696 /// ```
697 /// ## Current thread executor
698 /// ```
699 /// # use std::sync::Arc;
700 /// # use std::sync::atomic::{AtomicBool, Ordering};
701 /// # use tokio::runtime;
702 /// # use tokio::sync::Barrier;
703 /// # pub fn main() {
704 /// let once = AtomicBool::new(true);
705 /// let barrier = Arc::new(Barrier::new(2));
706 ///
707 /// let runtime = runtime::Builder::new_current_thread()
708 /// .on_thread_park({
709 /// let barrier = barrier.clone();
710 /// move || {
711 /// let barrier = barrier.clone();
712 /// if once.swap(false, Ordering::Relaxed) {
713 /// tokio::spawn(async move { barrier.wait().await; });
714 /// }
715 /// }
716 /// })
717 /// .build()
718 /// .unwrap();
719 ///
720 /// runtime.block_on(async {
721 /// barrier.wait().await;
722 /// })
723 /// # }
724 /// ```
725 #[cfg(not(loom))]
726 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
727 where
728 F: Fn() + Send + Sync + 'static,
729 {
730 self.before_park = Some(std::sync::Arc::new(f));
731 self
732 }
733
734 /// Executes function `f` just after a thread unparks (starts executing tasks).
735 ///
736 /// This is intended for bookkeeping and monitoring use cases; note that work
737 /// in this callback will increase latencies when the application has allowed one or
738 /// more runtime threads to go idle.
739 ///
740 /// Note: There can only be one unpark callback for a runtime; calling this function
741 /// more than once replaces the last callback defined, rather than adding to it.
742 ///
743 /// # Examples
744 ///
745 /// ```
746 /// # #[cfg(not(target_family = "wasm"))]
747 /// # {
748 /// # use tokio::runtime;
749 /// # pub fn main() {
750 /// let runtime = runtime::Builder::new_multi_thread()
751 /// .on_thread_unpark(|| {
752 /// println!("thread unparking");
753 /// })
754 /// .build();
755 ///
756 /// runtime.unwrap().block_on(async {
757 /// tokio::task::yield_now().await;
758 /// println!("Hello from Tokio!");
759 /// })
760 /// # }
761 /// # }
762 /// ```
763 #[cfg(not(loom))]
764 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
765 where
766 F: Fn() + Send + Sync + 'static,
767 {
768 self.after_unpark = Some(std::sync::Arc::new(f));
769 self
770 }
771
772 /// Executes function `f` just before a task is spawned.
773 ///
774 /// `f` is called within the Tokio context, so functions like
775 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
776 /// invoked immediately.
777 ///
778 /// This can be used for bookkeeping or monitoring purposes.
779 ///
780 /// Note: There can only be one spawn callback for a runtime; calling this function more
781 /// than once replaces the last callback defined, rather than adding to it.
782 ///
783 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
784 ///
785 /// **Note**: This is an [unstable API][unstable]. The public API of this type
786 /// may break in 1.x releases. See [the documentation on unstable
787 /// features][unstable] for details.
788 ///
789 /// [unstable]: crate#unstable-features
790 ///
791 /// # Examples
792 ///
793 /// ```
794 /// # use tokio::runtime;
795 /// # pub fn main() {
796 /// let runtime = runtime::Builder::new_current_thread()
797 /// .on_task_spawn(|_| {
798 /// println!("spawning task");
799 /// })
800 /// .build()
801 /// .unwrap();
802 ///
803 /// runtime.block_on(async {
804 /// tokio::task::spawn(std::future::ready(()));
805 ///
806 /// for _ in 0..64 {
807 /// tokio::task::yield_now().await;
808 /// }
809 /// })
810 /// # }
811 /// ```
812 #[cfg(all(not(loom), tokio_unstable))]
813 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
814 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
815 where
816 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
817 {
818 self.before_spawn = Some(std::sync::Arc::new(f));
819 self
820 }
821
822 /// Executes function `f` just before a task is polled
823 ///
824 /// `f` is called within the Tokio context, so functions like
825 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
826 /// invoked immediately.
827 ///
828 /// **Note**: This is an [unstable API][unstable]. The public API of this type
829 /// may break in 1.x releases. See [the documentation on unstable
830 /// features][unstable] for details.
831 ///
832 /// [unstable]: crate#unstable-features
833 ///
834 /// # Examples
835 ///
836 /// ```
837 /// # #[cfg(not(target_family = "wasm"))]
838 /// # {
839 /// # use std::sync::{atomic::AtomicUsize, Arc};
840 /// # use tokio::task::yield_now;
841 /// # pub fn main() {
842 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
843 /// let poll_start = poll_start_counter.clone();
844 /// let rt = tokio::runtime::Builder::new_multi_thread()
845 /// .enable_all()
846 /// .on_before_task_poll(move |meta| {
847 /// println!("task {} is about to be polled", meta.id())
848 /// })
849 /// .build()
850 /// .unwrap();
851 /// let task = rt.spawn(async {
852 /// yield_now().await;
853 /// });
854 /// let _ = rt.block_on(task);
855 ///
856 /// # }
857 /// # }
858 /// ```
859 #[cfg(tokio_unstable)]
860 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
861 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
862 where
863 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
864 {
865 self.before_poll = Some(std::sync::Arc::new(f));
866 self
867 }
868
869 /// Executes function `f` just after a task is polled
870 ///
871 /// `f` is called within the Tokio context, so functions like
872 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
873 /// invoked immediately.
874 ///
875 /// **Note**: This is an [unstable API][unstable]. The public API of this type
876 /// may break in 1.x releases. See [the documentation on unstable
877 /// features][unstable] for details.
878 ///
879 /// [unstable]: crate#unstable-features
880 ///
881 /// # Examples
882 ///
883 /// ```
884 /// # #[cfg(not(target_family = "wasm"))]
885 /// # {
886 /// # use std::sync::{atomic::AtomicUsize, Arc};
887 /// # use tokio::task::yield_now;
888 /// # pub fn main() {
889 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
890 /// let poll_stop = poll_stop_counter.clone();
891 /// let rt = tokio::runtime::Builder::new_multi_thread()
892 /// .enable_all()
893 /// .on_after_task_poll(move |meta| {
894 /// println!("task {} completed polling", meta.id());
895 /// })
896 /// .build()
897 /// .unwrap();
898 /// let task = rt.spawn(async {
899 /// yield_now().await;
900 /// });
901 /// let _ = rt.block_on(task);
902 ///
903 /// # }
904 /// # }
905 /// ```
906 #[cfg(tokio_unstable)]
907 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
908 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
909 where
910 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
911 {
912 self.after_poll = Some(std::sync::Arc::new(f));
913 self
914 }
915
916 /// Executes function `f` just after a task is terminated.
917 ///
918 /// `f` is called within the Tokio context, so functions like
919 /// [`tokio::spawn`](crate::spawn) can be called.
920 ///
921 /// This can be used for bookkeeping or monitoring purposes.
922 ///
923 /// Note: There can only be one task termination callback for a runtime; calling this
924 /// function more than once replaces the last callback defined, rather than adding to it.
925 ///
926 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
927 ///
928 /// **Note**: This is an [unstable API][unstable]. The public API of this type
929 /// may break in 1.x releases. See [the documentation on unstable
930 /// features][unstable] for details.
931 ///
932 /// [unstable]: crate#unstable-features
933 ///
934 /// # Examples
935 ///
936 /// ```
937 /// # use tokio::runtime;
938 /// # pub fn main() {
939 /// let runtime = runtime::Builder::new_current_thread()
940 /// .on_task_terminate(|_| {
941 /// println!("killing task");
942 /// })
943 /// .build()
944 /// .unwrap();
945 ///
946 /// runtime.block_on(async {
947 /// tokio::task::spawn(std::future::ready(()));
948 ///
949 /// for _ in 0..64 {
950 /// tokio::task::yield_now().await;
951 /// }
952 /// })
953 /// # }
954 /// ```
955 #[cfg(all(not(loom), tokio_unstable))]
956 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
957 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
958 where
959 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
960 {
961 self.after_termination = Some(std::sync::Arc::new(f));
962 self
963 }
964
965 /// Creates the configured `Runtime`.
966 ///
967 /// The returned `Runtime` instance is ready to spawn tasks.
968 ///
969 /// # Examples
970 ///
971 /// ```
972 /// # #[cfg(not(target_family = "wasm"))]
973 /// # {
974 /// use tokio::runtime::Builder;
975 ///
976 /// let rt = Builder::new_multi_thread().build().unwrap();
977 ///
978 /// rt.block_on(async {
979 /// println!("Hello from the Tokio runtime");
980 /// });
981 /// # }
982 /// ```
983 pub fn build(&mut self) -> io::Result<Runtime> {
984 match &self.kind {
985 Kind::CurrentThread => self.build_current_thread_runtime(),
986 #[cfg(feature = "rt-multi-thread")]
987 Kind::MultiThread => self.build_threaded_runtime(),
988 }
989 }
990
991 /// Creates the configured [`LocalRuntime`].
992 ///
993 /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
994 ///
995 /// # Panics
996 ///
997 /// This will panic if the runtime is configured with [`new_multi_thread()`].
998 ///
999 /// [`new_multi_thread()`]: Builder::new_multi_thread
1000 ///
1001 /// # Examples
1002 ///
1003 /// ```
1004 /// use tokio::runtime::{Builder, LocalOptions};
1005 ///
1006 /// let rt = Builder::new_current_thread()
1007 /// .build_local(LocalOptions::default())
1008 /// .unwrap();
1009 ///
1010 /// rt.spawn_local(async {
1011 /// println!("Hello from the Tokio runtime");
1012 /// });
1013 /// ```
1014 #[allow(unused_variables, unreachable_patterns)]
1015 #[cfg(tokio_unstable)]
1016 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1017 pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1018 match &self.kind {
1019 Kind::CurrentThread => self.build_current_thread_local_runtime(),
1020 #[cfg(feature = "rt-multi-thread")]
1021 Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1022 }
1023 }
1024
1025 fn get_cfg(&self) -> driver::Cfg {
1026 driver::Cfg {
1027 enable_pause_time: match self.kind {
1028 Kind::CurrentThread => true,
1029 #[cfg(feature = "rt-multi-thread")]
1030 Kind::MultiThread => false,
1031 },
1032 enable_io: self.enable_io,
1033 enable_time: self.enable_time,
1034 start_paused: self.start_paused,
1035 nevents: self.nevents,
1036 timer_flavor: self.timer_flavor,
1037 }
1038 }
1039
1040 /// Sets a custom timeout for a thread in the blocking pool.
1041 ///
1042 /// By default, the timeout for a thread is set to 10 seconds. This can
1043 /// be overridden using `.thread_keep_alive()`.
1044 ///
1045 /// # Example
1046 ///
1047 /// ```
1048 /// # #[cfg(not(target_family = "wasm"))]
1049 /// # {
1050 /// # use tokio::runtime;
1051 /// # use std::time::Duration;
1052 /// # pub fn main() {
1053 /// let rt = runtime::Builder::new_multi_thread()
1054 /// .thread_keep_alive(Duration::from_millis(100))
1055 /// .build();
1056 /// # }
1057 /// # }
1058 /// ```
1059 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1060 self.keep_alive = Some(duration);
1061 self
1062 }
1063
1064 /// Sets the number of scheduler ticks after which the scheduler will poll the global
1065 /// task queue.
1066 ///
1067 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1068 ///
1069 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1070 /// [the module documentation] for the default behavior of the multi-thread scheduler.
1071 ///
1072 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1073 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1074 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1075 /// getting started on new work, especially if tasks frequently yield rather than complete
1076 /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1077 /// tasks from the local queue will be executed only if the global queue is empty.
1078 /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1079 /// tasks quickly complete polling.
1080 ///
1081 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1082 ///
1083 /// # Panics
1084 ///
1085 /// This function will panic if 0 is passed as an argument.
1086 ///
1087 /// # Examples
1088 ///
1089 /// ```
1090 /// # #[cfg(not(target_family = "wasm"))]
1091 /// # {
1092 /// # use tokio::runtime;
1093 /// # pub fn main() {
1094 /// let rt = runtime::Builder::new_multi_thread()
1095 /// .global_queue_interval(31)
1096 /// .build();
1097 /// # }
1098 /// # }
1099 /// ```
1100 #[track_caller]
1101 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1102 assert!(val > 0, "global_queue_interval must be greater than 0");
1103 self.global_queue_interval = Some(val);
1104 self
1105 }
1106
1107 /// Sets the number of scheduler ticks after which the scheduler will poll for
1108 /// external events (timers, I/O, and so on).
1109 ///
1110 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1111 ///
1112 /// By default, the event interval is `61` for all scheduler types.
1113 ///
1114 /// Setting the event interval determines the effective "priority" of delivering
1115 /// these external events (which may wake up additional tasks), compared to
1116 /// executing tasks that are currently ready to run. A smaller value is useful
1117 /// when tasks frequently spend a long time in polling, or frequently yield,
1118 /// which can result in overly long delays picking up I/O events. Conversely,
1119 /// picking up new events requires extra synchronization and syscall overhead,
1120 /// so if tasks generally complete their polling quickly, a higher event interval
1121 /// will minimize that overhead while still keeping the scheduler responsive to
1122 /// events.
1123 ///
1124 /// # Examples
1125 ///
1126 /// ```
1127 /// # #[cfg(not(target_family = "wasm"))]
1128 /// # {
1129 /// # use tokio::runtime;
1130 /// # pub fn main() {
1131 /// let rt = runtime::Builder::new_multi_thread()
1132 /// .event_interval(31)
1133 /// .build();
1134 /// # }
1135 /// # }
1136 /// ```
1137 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1138 self.event_interval = val;
1139 self
1140 }
1141
1142 cfg_unstable! {
1143 /// Configure how the runtime responds to an unhandled panic on a
1144 /// spawned task.
1145 ///
1146 /// By default, an unhandled panic (i.e. a panic not caught by
1147 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1148 /// execution. The panic's error value is forwarded to the task's
1149 /// [`JoinHandle`] and all other spawned tasks continue running.
1150 ///
1151 /// The `unhandled_panic` option enables configuring this behavior.
1152 ///
1153 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1154 /// spawned tasks have no impact on the runtime's execution.
1155 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1156 /// shutdown immediately when a spawned task panics even if that
1157 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1158 /// will immediately terminate and further calls to
1159 /// [`Runtime::block_on`] will panic.
1160 ///
1161 /// # Panics
1162 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1163 /// on a runtime other than the current thread runtime.
1164 ///
1165 /// # Unstable
1166 ///
1167 /// This option is currently unstable and its implementation is
1168 /// incomplete. The API may change or be removed in the future. See
1169 /// issue [tokio-rs/tokio#4516] for more details.
1170 ///
1171 /// # Examples
1172 ///
1173 /// The following demonstrates a runtime configured to shutdown on
1174 /// panic. The first spawned task panics and results in the runtime
1175 /// shutting down. The second spawned task never has a chance to
1176 /// execute. The call to `block_on` will panic due to the runtime being
1177 /// forcibly shutdown.
1178 ///
1179 /// ```should_panic
1180 /// use tokio::runtime::{self, UnhandledPanic};
1181 ///
1182 /// # pub fn main() {
1183 /// let rt = runtime::Builder::new_current_thread()
1184 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1185 /// .build()
1186 /// .unwrap();
1187 ///
1188 /// rt.spawn(async { panic!("boom"); });
1189 /// rt.spawn(async {
1190 /// // This task never completes.
1191 /// });
1192 ///
1193 /// rt.block_on(async {
1194 /// // Do some work
1195 /// # loop { tokio::task::yield_now().await; }
1196 /// })
1197 /// # }
1198 /// ```
1199 ///
1200 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1201 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1202 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1203 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1204 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1205 }
1206
1207 self.unhandled_panic = behavior;
1208 self
1209 }
1210
1211 /// Disables the LIFO task scheduler heuristic.
1212 ///
1213 /// The multi-threaded scheduler includes a heuristic for optimizing
1214 /// message-passing patterns. This heuristic results in the **last**
1215 /// scheduled task being polled first.
1216 ///
1217 /// To implement this heuristic, each worker thread has a slot which
1218 /// holds the task that should be polled next. However, this slot cannot
1219 /// be stolen by other worker threads, which can result in lower total
1220 /// throughput when tasks tend to have longer poll times.
1221 ///
1222 /// This configuration option will disable this heuristic resulting in
1223 /// all scheduled tasks being pushed into the worker-local queue, which
1224 /// is stealable.
1225 ///
1226 /// Consider trying this option when the task "scheduled" time is high
1227 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1228 /// collect this data.
1229 ///
1230 /// # Unstable
1231 ///
1232 /// This configuration option is considered a workaround for the LIFO
1233 /// slot not being stealable. When the slot becomes stealable, we will
1234 /// revisit whether or not this option is necessary. See
1235 /// issue [tokio-rs/tokio#4941].
1236 ///
1237 /// # Examples
1238 ///
1239 /// ```
1240 /// # #[cfg(not(target_family = "wasm"))]
1241 /// # {
1242 /// use tokio::runtime;
1243 ///
1244 /// let rt = runtime::Builder::new_multi_thread()
1245 /// .disable_lifo_slot()
1246 /// .build()
1247 /// .unwrap();
1248 /// # }
1249 /// ```
1250 ///
1251 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1252 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1253 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1254 self.disable_lifo_slot = true;
1255 self
1256 }
1257
1258 /// Specifies the random number generation seed to use within all
1259 /// threads associated with the runtime being built.
1260 ///
1261 /// This option is intended to make certain parts of the runtime
1262 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1263 /// [`tokio::select!`] it will ensure that the order that branches are
1264 /// polled is deterministic.
1265 ///
1266 /// In addition to the code specifying `rng_seed` and interacting with
1267 /// the runtime, the internals of Tokio and the Rust compiler may affect
1268 /// the sequences of random numbers. In order to ensure repeatable
1269 /// results, the version of Tokio, the versions of all other
1270 /// dependencies that interact with Tokio, and the Rust compiler version
1271 /// should also all remain constant.
1272 ///
1273 /// # Examples
1274 ///
1275 /// ```
1276 /// # use tokio::runtime::{self, RngSeed};
1277 /// # pub fn main() {
1278 /// let seed = RngSeed::from_bytes(b"place your seed here");
1279 /// let rt = runtime::Builder::new_current_thread()
1280 /// .rng_seed(seed)
1281 /// .build();
1282 /// # }
1283 /// ```
1284 ///
1285 /// [`tokio::select!`]: crate::select
1286 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1287 self.seed_generator = RngSeedGenerator::new(seed);
1288 self
1289 }
1290 }
1291
1292 cfg_unstable_metrics! {
1293 /// Enables tracking the distribution of task poll times.
1294 ///
1295 /// Task poll times are not instrumented by default as doing so requires
1296 /// calling [`Instant::now()`] twice per task poll, which could add
1297 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1298 /// metrics data.
1299 ///
1300 /// The histogram uses fixed bucket sizes. In other words, the histogram
1301 /// buckets are not dynamic based on input values. Use the
1302 /// `metrics_poll_time_histogram` builder methods to configure the
1303 /// histogram details.
1304 ///
1305 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1306 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1307 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1308 /// to select [`LogHistogram`] instead.
1309 ///
1310 /// # Examples
1311 ///
1312 /// ```
1313 /// # #[cfg(not(target_family = "wasm"))]
1314 /// # {
1315 /// use tokio::runtime;
1316 ///
1317 /// let rt = runtime::Builder::new_multi_thread()
1318 /// .enable_metrics_poll_time_histogram()
1319 /// .build()
1320 /// .unwrap();
1321 /// # // Test default values here
1322 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1323 /// # let m = rt.handle().metrics();
1324 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1325 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1326 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1327 /// # }
1328 /// ```
1329 ///
1330 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1331 /// [`Instant::now()`]: std::time::Instant::now
1332 /// [`LogHistogram`]: crate::runtime::LogHistogram
1333 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1334 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1335 self.metrics_poll_count_histogram_enable = true;
1336 self
1337 }
1338
1339 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1340 ///
1341 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1342 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1343 #[doc(hidden)]
1344 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1345 self.enable_metrics_poll_time_histogram()
1346 }
1347
1348 /// Sets the histogram scale for tracking the distribution of task poll
1349 /// times.
1350 ///
1351 /// Tracking the distribution of task poll times can be done using a
1352 /// linear or log scale. When using linear scale, each histogram bucket
1353 /// will represent the same range of poll times. When using log scale,
1354 /// each histogram bucket will cover a range twice as big as the
1355 /// previous bucket.
1356 ///
1357 /// **Default:** linear scale.
1358 ///
1359 /// # Examples
1360 ///
1361 /// ```
1362 /// # #[cfg(not(target_family = "wasm"))]
1363 /// # {
1364 /// use tokio::runtime::{self, HistogramScale};
1365 ///
1366 /// # #[allow(deprecated)]
1367 /// let rt = runtime::Builder::new_multi_thread()
1368 /// .enable_metrics_poll_time_histogram()
1369 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1370 /// .build()
1371 /// .unwrap();
1372 /// # }
1373 /// ```
1374 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1375 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1376 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1377 self
1378 }
1379
1380 /// Configure the histogram for tracking poll times
1381 ///
1382 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1383 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1384 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1385 ///
1386 /// # Examples
1387 /// Configure a [`LogHistogram`] with [default configuration]:
1388 /// ```
1389 /// # #[cfg(not(target_family = "wasm"))]
1390 /// # {
1391 /// use tokio::runtime;
1392 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1393 ///
1394 /// let rt = runtime::Builder::new_multi_thread()
1395 /// .enable_metrics_poll_time_histogram()
1396 /// .metrics_poll_time_histogram_configuration(
1397 /// HistogramConfiguration::log(LogHistogram::default())
1398 /// )
1399 /// .build()
1400 /// .unwrap();
1401 /// # }
1402 /// ```
1403 ///
1404 /// Configure a linear histogram with 100 buckets, each 10μs wide
1405 /// ```
1406 /// # #[cfg(not(target_family = "wasm"))]
1407 /// # {
1408 /// use tokio::runtime;
1409 /// use std::time::Duration;
1410 /// use tokio::runtime::HistogramConfiguration;
1411 ///
1412 /// let rt = runtime::Builder::new_multi_thread()
1413 /// .enable_metrics_poll_time_histogram()
1414 /// .metrics_poll_time_histogram_configuration(
1415 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1416 /// )
1417 /// .build()
1418 /// .unwrap();
1419 /// # }
1420 /// ```
1421 ///
1422 /// Configure a [`LogHistogram`] with the following settings:
1423 /// - Measure times from 100ns to 120s
1424 /// - Max error of 0.1
1425 /// - No more than 1024 buckets
1426 /// ```
1427 /// # #[cfg(not(target_family = "wasm"))]
1428 /// # {
1429 /// use std::time::Duration;
1430 /// use tokio::runtime;
1431 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1432 ///
1433 /// let rt = runtime::Builder::new_multi_thread()
1434 /// .enable_metrics_poll_time_histogram()
1435 /// .metrics_poll_time_histogram_configuration(
1436 /// HistogramConfiguration::log(LogHistogram::builder()
1437 /// .max_value(Duration::from_secs(120))
1438 /// .min_value(Duration::from_nanos(100))
1439 /// .max_error(0.1)
1440 /// .max_buckets(1024)
1441 /// .expect("configuration uses 488 buckets")
1442 /// )
1443 /// )
1444 /// .build()
1445 /// .unwrap();
1446 /// # }
1447 /// ```
1448 ///
1449 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1450 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1451 /// where each bucket is twice the size of the previous bucket.
1452 /// ```rust
1453 /// use std::time::Duration;
1454 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1455 /// let rt = tokio::runtime::Builder::new_current_thread()
1456 /// .enable_all()
1457 /// .enable_metrics_poll_time_histogram()
1458 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1459 /// LogHistogram::builder()
1460 /// .min_value(Duration::from_micros(20))
1461 /// .max_value(Duration::from_millis(4))
1462 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1463 /// .precision_exact(0)
1464 /// .max_buckets(10)
1465 /// .unwrap(),
1466 /// ))
1467 /// .build()
1468 /// .unwrap();
1469 /// ```
1470 ///
1471 /// [`LogHistogram`]: crate::runtime::LogHistogram
1472 /// [default configuration]: crate::runtime::LogHistogramBuilder
1473 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1474 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1475 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1476 self
1477 }
1478
1479 /// Sets the histogram resolution for tracking the distribution of task
1480 /// poll times.
1481 ///
1482 /// The resolution is the histogram's first bucket's range. When using a
1483 /// linear histogram scale, each bucket will cover the same range. When
1484 /// using a log scale, each bucket will cover a range twice as big as
1485 /// the previous bucket. In the log case, the resolution represents the
1486 /// smallest bucket range.
1487 ///
1488 /// Note that, when using log scale, the resolution is rounded up to the
1489 /// nearest power of 2 in nanoseconds.
1490 ///
1491 /// **Default:** 100 microseconds.
1492 ///
1493 /// # Examples
1494 ///
1495 /// ```
1496 /// # #[cfg(not(target_family = "wasm"))]
1497 /// # {
1498 /// use tokio::runtime;
1499 /// use std::time::Duration;
1500 ///
1501 /// # #[allow(deprecated)]
1502 /// let rt = runtime::Builder::new_multi_thread()
1503 /// .enable_metrics_poll_time_histogram()
1504 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1505 /// .build()
1506 /// .unwrap();
1507 /// # }
1508 /// ```
1509 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1510 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1511 assert!(resolution > Duration::from_secs(0));
1512 // Sanity check the argument and also make the cast below safe.
1513 assert!(resolution <= Duration::from_secs(1));
1514
1515 let resolution = resolution.as_nanos() as u64;
1516
1517 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1518 self
1519 }
1520
1521 /// Sets the number of buckets for the histogram tracking the
1522 /// distribution of task poll times.
1523 ///
1524 /// The last bucket tracks all greater values that fall out of other
1525 /// ranges. So, configuring the histogram using a linear scale,
1526 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1527 /// polls that take more than 450ms to complete.
1528 ///
1529 /// **Default:** 10
1530 ///
1531 /// # Examples
1532 ///
1533 /// ```
1534 /// # #[cfg(not(target_family = "wasm"))]
1535 /// # {
1536 /// use tokio::runtime;
1537 ///
1538 /// # #[allow(deprecated)]
1539 /// let rt = runtime::Builder::new_multi_thread()
1540 /// .enable_metrics_poll_time_histogram()
1541 /// .metrics_poll_count_histogram_buckets(15)
1542 /// .build()
1543 /// .unwrap();
1544 /// # }
1545 /// ```
1546 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1547 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1548 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1549 self
1550 }
1551 }
1552
1553 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1554 use crate::runtime::runtime::Scheduler;
1555
1556 let (scheduler, handle, blocking_pool) =
1557 self.build_current_thread_runtime_components(None)?;
1558
1559 Ok(Runtime::from_parts(
1560 Scheduler::CurrentThread(scheduler),
1561 handle,
1562 blocking_pool,
1563 ))
1564 }
1565
1566 #[cfg(tokio_unstable)]
1567 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1568 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1569
1570 let tid = std::thread::current().id();
1571
1572 let (scheduler, handle, blocking_pool) =
1573 self.build_current_thread_runtime_components(Some(tid))?;
1574
1575 Ok(LocalRuntime::from_parts(
1576 LocalRuntimeScheduler::CurrentThread(scheduler),
1577 handle,
1578 blocking_pool,
1579 ))
1580 }
1581
1582 fn build_current_thread_runtime_components(
1583 &mut self,
1584 local_tid: Option<ThreadId>,
1585 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1586 use crate::runtime::scheduler;
1587 use crate::runtime::Config;
1588
1589 let mut cfg = self.get_cfg();
1590 cfg.timer_flavor = TimerFlavor::Traditional;
1591 let (driver, driver_handle) = driver::Driver::new(cfg)?;
1592
1593 // Blocking pool
1594 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1595 let blocking_spawner = blocking_pool.spawner().clone();
1596
1597 // Generate a rng seed for this runtime.
1598 let seed_generator_1 = self.seed_generator.next_generator();
1599 let seed_generator_2 = self.seed_generator.next_generator();
1600
1601 // And now put a single-threaded scheduler on top of the timer. When
1602 // there are no futures ready to do something, it'll let the timer or
1603 // the reactor to generate some new stimuli for the futures to continue
1604 // in their life.
1605 let (scheduler, handle) = CurrentThread::new(
1606 driver,
1607 driver_handle,
1608 blocking_spawner,
1609 seed_generator_2,
1610 Config {
1611 before_park: self.before_park.clone(),
1612 after_unpark: self.after_unpark.clone(),
1613 before_spawn: self.before_spawn.clone(),
1614 #[cfg(tokio_unstable)]
1615 before_poll: self.before_poll.clone(),
1616 #[cfg(tokio_unstable)]
1617 after_poll: self.after_poll.clone(),
1618 after_termination: self.after_termination.clone(),
1619 global_queue_interval: self.global_queue_interval,
1620 event_interval: self.event_interval,
1621 #[cfg(tokio_unstable)]
1622 unhandled_panic: self.unhandled_panic.clone(),
1623 disable_lifo_slot: self.disable_lifo_slot,
1624 seed_generator: seed_generator_1,
1625 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1626 },
1627 local_tid,
1628 );
1629
1630 let handle = Handle {
1631 inner: scheduler::Handle::CurrentThread(handle),
1632 };
1633
1634 Ok((scheduler, handle, blocking_pool))
1635 }
1636
1637 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1638 if self.metrics_poll_count_histogram_enable {
1639 Some(self.metrics_poll_count_histogram.clone())
1640 } else {
1641 None
1642 }
1643 }
1644}
1645
1646cfg_io_driver! {
1647 impl Builder {
1648 /// Enables the I/O driver.
1649 ///
1650 /// Doing this enables using net, process, signal, and some I/O types on
1651 /// the runtime.
1652 ///
1653 /// # Examples
1654 ///
1655 /// ```
1656 /// use tokio::runtime;
1657 ///
1658 /// let rt = runtime::Builder::new_multi_thread()
1659 /// .enable_io()
1660 /// .build()
1661 /// .unwrap();
1662 /// ```
1663 pub fn enable_io(&mut self) -> &mut Self {
1664 self.enable_io = true;
1665 self
1666 }
1667
1668 /// Enables the I/O driver and configures the max number of events to be
1669 /// processed per tick.
1670 ///
1671 /// # Examples
1672 ///
1673 /// ```
1674 /// use tokio::runtime;
1675 ///
1676 /// let rt = runtime::Builder::new_current_thread()
1677 /// .enable_io()
1678 /// .max_io_events_per_tick(1024)
1679 /// .build()
1680 /// .unwrap();
1681 /// ```
1682 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1683 self.nevents = capacity;
1684 self
1685 }
1686 }
1687}
1688
1689cfg_time! {
1690 impl Builder {
1691 /// Enables the time driver.
1692 ///
1693 /// Doing this enables using `tokio::time` on the runtime.
1694 ///
1695 /// # Examples
1696 ///
1697 /// ```
1698 /// # #[cfg(not(target_family = "wasm"))]
1699 /// # {
1700 /// use tokio::runtime;
1701 ///
1702 /// let rt = runtime::Builder::new_multi_thread()
1703 /// .enable_time()
1704 /// .build()
1705 /// .unwrap();
1706 /// # }
1707 /// ```
1708 pub fn enable_time(&mut self) -> &mut Self {
1709 self.enable_time = true;
1710 self
1711 }
1712 }
1713}
1714
1715cfg_io_uring! {
1716 impl Builder {
1717 /// Enables the tokio's io_uring driver.
1718 ///
1719 /// Doing this enables using io_uring operations on the runtime.
1720 ///
1721 /// # Examples
1722 ///
1723 /// ```
1724 /// use tokio::runtime;
1725 ///
1726 /// let rt = runtime::Builder::new_multi_thread()
1727 /// .enable_io_uring()
1728 /// .build()
1729 /// .unwrap();
1730 /// ```
1731 #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1732 pub fn enable_io_uring(&mut self) -> &mut Self {
1733 // Currently, the uring flag is equivalent to `enable_io`.
1734 self.enable_io = true;
1735 self
1736 }
1737 }
1738}
1739
1740cfg_test_util! {
1741 impl Builder {
1742 /// Controls if the runtime's clock starts paused or advancing.
1743 ///
1744 /// Pausing time requires the current-thread runtime; construction of
1745 /// the runtime will panic otherwise.
1746 ///
1747 /// # Examples
1748 ///
1749 /// ```
1750 /// use tokio::runtime;
1751 ///
1752 /// let rt = runtime::Builder::new_current_thread()
1753 /// .enable_time()
1754 /// .start_paused(true)
1755 /// .build()
1756 /// .unwrap();
1757 /// ```
1758 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1759 self.start_paused = start_paused;
1760 self
1761 }
1762 }
1763}
1764
1765cfg_rt_multi_thread! {
1766 impl Builder {
1767 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1768 use crate::loom::sys::num_cpus;
1769 use crate::runtime::{Config, runtime::Scheduler};
1770 use crate::runtime::scheduler::{self, MultiThread};
1771
1772 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1773
1774 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1775
1776 // Create the blocking pool
1777 let blocking_pool =
1778 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1779 let blocking_spawner = blocking_pool.spawner().clone();
1780
1781 // Generate a rng seed for this runtime.
1782 let seed_generator_1 = self.seed_generator.next_generator();
1783 let seed_generator_2 = self.seed_generator.next_generator();
1784
1785 let (scheduler, handle, launch) = MultiThread::new(
1786 worker_threads,
1787 driver,
1788 driver_handle,
1789 blocking_spawner,
1790 seed_generator_2,
1791 Config {
1792 before_park: self.before_park.clone(),
1793 after_unpark: self.after_unpark.clone(),
1794 before_spawn: self.before_spawn.clone(),
1795 #[cfg(tokio_unstable)]
1796 before_poll: self.before_poll.clone(),
1797 #[cfg(tokio_unstable)]
1798 after_poll: self.after_poll.clone(),
1799 after_termination: self.after_termination.clone(),
1800 global_queue_interval: self.global_queue_interval,
1801 event_interval: self.event_interval,
1802 #[cfg(tokio_unstable)]
1803 unhandled_panic: self.unhandled_panic.clone(),
1804 disable_lifo_slot: self.disable_lifo_slot,
1805 seed_generator: seed_generator_1,
1806 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1807 },
1808 self.timer_flavor,
1809 );
1810
1811 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1812
1813 // Spawn the thread pool workers
1814 let _enter = handle.enter();
1815 launch.launch();
1816
1817 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1818 }
1819 }
1820}
1821
1822impl fmt::Debug for Builder {
1823 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1824 fmt.debug_struct("Builder")
1825 .field("worker_threads", &self.worker_threads)
1826 .field("max_blocking_threads", &self.max_blocking_threads)
1827 .field(
1828 "thread_name",
1829 &"<dyn Fn() -> String + Send + Sync + 'static>",
1830 )
1831 .field("thread_stack_size", &self.thread_stack_size)
1832 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1833 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1834 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1835 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1836 .finish()
1837 }
1838}