tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33cfg_io_uring! {
34 #[cfg(test)]
35 use super::mocks::spawn;
36 #[cfg(not(test))]
37 use crate::spawn;
38}
39
40/// A reference to an open file on the filesystem.
41///
42/// This is a specialized version of [`std::fs::File`] for usage from the
43/// Tokio runtime.
44///
45/// An instance of a `File` can be read and/or written depending on what options
46/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
47/// cursor that the file contains internally.
48///
49/// A file will not be closed immediately when it goes out of scope if there
50/// are any IO operations that have not yet completed. To ensure that a file is
51/// closed immediately when it is dropped, you should call [`flush`] before
52/// dropping it. Note that this does not ensure that the file has been fully
53/// written to disk; the operating system might keep the changes around in an
54/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
55/// the data to disk.
56///
57/// Reading and writing to a `File` is usually done using the convenience
58/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
59///
60/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
61/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
62/// [`sync_all`]: fn@crate::fs::File::sync_all
63/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
64/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
65///
66/// # Examples
67///
68/// Create a new file and asynchronously write bytes to it:
69///
70/// ```no_run
71/// use tokio::fs::File;
72/// use tokio::io::AsyncWriteExt; // for write_all()
73///
74/// # async fn dox() -> std::io::Result<()> {
75/// let mut file = File::create("foo.txt").await?;
76/// file.write_all(b"hello, world!").await?;
77/// # Ok(())
78/// # }
79/// ```
80///
81/// Read the contents of a file into a buffer:
82///
83/// ```no_run
84/// use tokio::fs::File;
85/// use tokio::io::AsyncReadExt; // for read_to_end()
86///
87/// # async fn dox() -> std::io::Result<()> {
88/// let mut file = File::open("foo.txt").await?;
89///
90/// let mut contents = vec![];
91/// file.read_to_end(&mut contents).await?;
92///
93/// println!("len = {}", contents.len());
94/// # Ok(())
95/// # }
96/// ```
97pub struct File {
98 std: Arc<StdFile>,
99 inner: Mutex<Inner>,
100 max_buf_size: usize,
101}
102
103struct Inner {
104 state: State,
105
106 /// Errors from writes/flushes are returned in write/flush calls. If a write
107 /// error is observed while performing a read, it is saved until the next
108 /// write / flush call.
109 last_write_err: Option<io::ErrorKind>,
110
111 pos: u64,
112}
113
114#[derive(Debug)]
115enum State {
116 Idle(Option<Buf>),
117 Busy(JoinHandle<(Operation, Buf)>),
118}
119
120#[derive(Debug)]
121enum Operation {
122 Read(io::Result<usize>),
123 Write(io::Result<()>),
124 Seek(io::Result<u64>),
125}
126
127impl File {
128 /// Attempts to open a file in read-only mode.
129 ///
130 /// See [`OpenOptions`] for more details.
131 ///
132 /// # Errors
133 ///
134 /// This function will return an error if called from outside of the Tokio
135 /// runtime or if path does not already exist. Other errors may also be
136 /// returned according to `OpenOptions::open`.
137 ///
138 /// # Examples
139 ///
140 /// ```no_run
141 /// use tokio::fs::File;
142 /// use tokio::io::AsyncReadExt;
143 ///
144 /// # async fn dox() -> std::io::Result<()> {
145 /// let mut file = File::open("foo.txt").await?;
146 ///
147 /// let mut contents = vec![];
148 /// file.read_to_end(&mut contents).await?;
149 ///
150 /// println!("len = {}", contents.len());
151 /// # Ok(())
152 /// # }
153 /// ```
154 ///
155 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
156 ///
157 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
158 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
159 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
160 Self::options().read(true).open(path).await
161 }
162
163 /// Opens a file in write-only mode.
164 ///
165 /// This function will create a file if it does not exist, and will truncate
166 /// it if it does.
167 ///
168 /// See [`OpenOptions`] for more details.
169 ///
170 /// # Errors
171 ///
172 /// Results in an error if called from outside of the Tokio runtime or if
173 /// the underlying [`create`] call results in an error.
174 ///
175 /// [`create`]: std::fs::File::create
176 ///
177 /// # Examples
178 ///
179 /// ```no_run
180 /// use tokio::fs::File;
181 /// use tokio::io::AsyncWriteExt;
182 ///
183 /// # async fn dox() -> std::io::Result<()> {
184 /// let mut file = File::create("foo.txt").await?;
185 /// file.write_all(b"hello, world!").await?;
186 /// # Ok(())
187 /// # }
188 /// ```
189 ///
190 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
191 ///
192 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
193 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
194 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
195 Self::options()
196 .write(true)
197 .create(true)
198 .truncate(true)
199 .open(path)
200 .await
201 }
202
203 /// Opens a file in read-write mode.
204 ///
205 /// This function will create a file if it does not exist, or return an error
206 /// if it does. This way, if the call succeeds, the file returned is guaranteed
207 /// to be new.
208 ///
209 /// This option is useful because it is atomic. Otherwise between checking
210 /// whether a file exists and creating a new one, the file may have been
211 /// created by another process (a TOCTOU race condition / attack).
212 ///
213 /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
214 ///
215 /// See [`OpenOptions`] for more details.
216 ///
217 /// # Examples
218 ///
219 /// ```no_run
220 /// use tokio::fs::File;
221 /// use tokio::io::AsyncWriteExt;
222 ///
223 /// # async fn dox() -> std::io::Result<()> {
224 /// let mut file = File::create_new("foo.txt").await?;
225 /// file.write_all(b"hello, world!").await?;
226 /// # Ok(())
227 /// # }
228 /// ```
229 ///
230 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
231 ///
232 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
233 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
234 pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
235 Self::options()
236 .read(true)
237 .write(true)
238 .create_new(true)
239 .open(path)
240 .await
241 }
242
243 /// Returns a new [`OpenOptions`] object.
244 ///
245 /// This function returns a new `OpenOptions` object that you can use to
246 /// open or create a file with specific options if `open()` or `create()`
247 /// are not appropriate.
248 ///
249 /// It is equivalent to `OpenOptions::new()`, but allows you to write more
250 /// readable code. Instead of
251 /// `OpenOptions::new().append(true).open("example.log")`,
252 /// you can write `File::options().append(true).open("example.log")`. This
253 /// also avoids the need to import `OpenOptions`.
254 ///
255 /// See the [`OpenOptions::new`] function for more details.
256 ///
257 /// # Examples
258 ///
259 /// ```no_run
260 /// use tokio::fs::File;
261 /// use tokio::io::AsyncWriteExt;
262 ///
263 /// # async fn dox() -> std::io::Result<()> {
264 /// let mut f = File::options().append(true).open("example.log").await?;
265 /// f.write_all(b"new line\n").await?;
266 /// # Ok(())
267 /// # }
268 /// ```
269 #[must_use]
270 pub fn options() -> OpenOptions {
271 OpenOptions::new()
272 }
273
274 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
275 ///
276 /// # Examples
277 ///
278 /// ```no_run
279 /// // This line could block. It is not recommended to do this on the Tokio
280 /// // runtime.
281 /// let std_file = std::fs::File::open("foo.txt").unwrap();
282 /// let file = tokio::fs::File::from_std(std_file);
283 /// ```
284 pub fn from_std(std: StdFile) -> File {
285 File {
286 std: Arc::new(std),
287 inner: Mutex::new(Inner {
288 state: State::Idle(Some(Buf::with_capacity(0))),
289 last_write_err: None,
290 pos: 0,
291 }),
292 max_buf_size: DEFAULT_MAX_BUF_SIZE,
293 }
294 }
295
296 /// Attempts to sync all OS-internal metadata to disk.
297 ///
298 /// This function will attempt to ensure that all in-core data reaches the
299 /// filesystem before returning.
300 ///
301 /// # Examples
302 ///
303 /// ```no_run
304 /// use tokio::fs::File;
305 /// use tokio::io::AsyncWriteExt;
306 ///
307 /// # async fn dox() -> std::io::Result<()> {
308 /// let mut file = File::create("foo.txt").await?;
309 /// file.write_all(b"hello, world!").await?;
310 /// file.sync_all().await?;
311 /// # Ok(())
312 /// # }
313 /// ```
314 ///
315 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
316 ///
317 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
318 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
319 pub async fn sync_all(&self) -> io::Result<()> {
320 let mut inner = self.inner.lock().await;
321 inner.complete_inflight().await;
322
323 let std = self.std.clone();
324 asyncify(move || std.sync_all()).await
325 }
326
327 /// This function is similar to `sync_all`, except that it may not
328 /// synchronize file metadata to the filesystem.
329 ///
330 /// This is intended for use cases that must synchronize content, but don't
331 /// need the metadata on disk. The goal of this method is to reduce disk
332 /// operations.
333 ///
334 /// Note that some platforms may simply implement this in terms of `sync_all`.
335 ///
336 /// # Examples
337 ///
338 /// ```no_run
339 /// use tokio::fs::File;
340 /// use tokio::io::AsyncWriteExt;
341 ///
342 /// # async fn dox() -> std::io::Result<()> {
343 /// let mut file = File::create("foo.txt").await?;
344 /// file.write_all(b"hello, world!").await?;
345 /// file.sync_data().await?;
346 /// # Ok(())
347 /// # }
348 /// ```
349 ///
350 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
351 ///
352 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
353 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
354 pub async fn sync_data(&self) -> io::Result<()> {
355 let mut inner = self.inner.lock().await;
356 inner.complete_inflight().await;
357
358 let std = self.std.clone();
359 asyncify(move || std.sync_data()).await
360 }
361
362 /// Truncates or extends the underlying file, updating the size of this file to become size.
363 ///
364 /// If the size is less than the current file's size, then the file will be
365 /// shrunk. If it is greater than the current file's size, then the file
366 /// will be extended to size and have all of the intermediate data filled in
367 /// with 0s.
368 ///
369 /// # Errors
370 ///
371 /// This function will return an error if the file is not opened for
372 /// writing.
373 ///
374 /// # Examples
375 ///
376 /// ```no_run
377 /// use tokio::fs::File;
378 /// use tokio::io::AsyncWriteExt;
379 ///
380 /// # async fn dox() -> std::io::Result<()> {
381 /// let mut file = File::create("foo.txt").await?;
382 /// file.write_all(b"hello, world!").await?;
383 /// file.set_len(10).await?;
384 /// # Ok(())
385 /// # }
386 /// ```
387 ///
388 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
389 ///
390 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
391 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
392 pub async fn set_len(&self, size: u64) -> io::Result<()> {
393 let mut inner = self.inner.lock().await;
394 inner.complete_inflight().await;
395
396 let mut buf = match inner.state {
397 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
398 _ => unreachable!(),
399 };
400
401 let seek = if !buf.is_empty() {
402 Some(SeekFrom::Current(buf.discard_read()))
403 } else {
404 None
405 };
406
407 let std = self.std.clone();
408
409 inner.state = State::Busy(spawn_blocking(move || {
410 let res = if let Some(seek) = seek {
411 (&*std).seek(seek).and_then(|_| std.set_len(size))
412 } else {
413 std.set_len(size)
414 }
415 .map(|()| 0); // the value is discarded later
416
417 // Return the result as a seek
418 (Operation::Seek(res), buf)
419 }));
420
421 let (op, buf) = match inner.state {
422 State::Idle(_) => unreachable!(),
423 State::Busy(ref mut rx) => rx.await?,
424 };
425
426 inner.state = State::Idle(Some(buf));
427
428 match op {
429 Operation::Seek(res) => res.map(|pos| {
430 inner.pos = pos;
431 }),
432 _ => unreachable!(),
433 }
434 }
435
436 /// Queries metadata about the underlying file.
437 ///
438 /// # Examples
439 ///
440 /// ```no_run
441 /// use tokio::fs::File;
442 ///
443 /// # async fn dox() -> std::io::Result<()> {
444 /// let file = File::open("foo.txt").await?;
445 /// let metadata = file.metadata().await?;
446 ///
447 /// println!("{:?}", metadata);
448 /// # Ok(())
449 /// # }
450 /// ```
451 pub async fn metadata(&self) -> io::Result<Metadata> {
452 let std = self.std.clone();
453 asyncify(move || std.metadata()).await
454 }
455
456 /// Creates a new `File` instance that shares the same underlying file handle
457 /// as the existing `File` instance. Reads, writes, and seeks will affect both
458 /// File instances simultaneously.
459 ///
460 /// # Examples
461 ///
462 /// ```no_run
463 /// use tokio::fs::File;
464 ///
465 /// # async fn dox() -> std::io::Result<()> {
466 /// let file = File::open("foo.txt").await?;
467 /// let file_clone = file.try_clone().await?;
468 /// # Ok(())
469 /// # }
470 /// ```
471 pub async fn try_clone(&self) -> io::Result<File> {
472 self.inner.lock().await.complete_inflight().await;
473 let std = self.std.clone();
474 let std_file = asyncify(move || std.try_clone()).await?;
475 let mut file = File::from_std(std_file);
476 file.set_max_buf_size(self.max_buf_size);
477 Ok(file)
478 }
479
480 /// Destructures `File` into a [`std::fs::File`]. This function is
481 /// async to allow any in-flight operations to complete.
482 ///
483 /// Use `File::try_into_std` to attempt conversion immediately.
484 ///
485 /// # Examples
486 ///
487 /// ```no_run
488 /// use tokio::fs::File;
489 ///
490 /// # async fn dox() -> std::io::Result<()> {
491 /// let tokio_file = File::open("foo.txt").await?;
492 /// let std_file = tokio_file.into_std().await;
493 /// # Ok(())
494 /// # }
495 /// ```
496 pub async fn into_std(mut self) -> StdFile {
497 self.inner.get_mut().complete_inflight().await;
498 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
499 }
500
501 /// Tries to immediately destructure `File` into a [`std::fs::File`].
502 ///
503 /// # Errors
504 ///
505 /// This function will return an error containing the file if some
506 /// operation is in-flight.
507 ///
508 /// # Examples
509 ///
510 /// ```no_run
511 /// use tokio::fs::File;
512 ///
513 /// # async fn dox() -> std::io::Result<()> {
514 /// let tokio_file = File::open("foo.txt").await?;
515 /// let std_file = tokio_file.try_into_std().unwrap();
516 /// # Ok(())
517 /// # }
518 /// ```
519 #[allow(clippy::result_large_err)]
520 pub fn try_into_std(mut self) -> Result<StdFile, Self> {
521 match Arc::try_unwrap(self.std) {
522 Ok(file) => Ok(file),
523 Err(std_file_arc) => {
524 self.std = std_file_arc;
525 Err(self)
526 }
527 }
528 }
529
530 /// Changes the permissions on the underlying file.
531 ///
532 /// # Platform-specific behavior
533 ///
534 /// This function currently corresponds to the `fchmod` function on Unix and
535 /// the `SetFileInformationByHandle` function on Windows. Note that, this
536 /// [may change in the future][changes].
537 ///
538 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
539 ///
540 /// # Errors
541 ///
542 /// This function will return an error if the user lacks permission change
543 /// attributes on the underlying file. It may also return an error in other
544 /// os-specific unspecified cases.
545 ///
546 /// # Examples
547 ///
548 /// ```no_run
549 /// use tokio::fs::File;
550 ///
551 /// # async fn dox() -> std::io::Result<()> {
552 /// let file = File::open("foo.txt").await?;
553 /// let mut perms = file.metadata().await?.permissions();
554 /// perms.set_readonly(true);
555 /// file.set_permissions(perms).await?;
556 /// # Ok(())
557 /// # }
558 /// ```
559 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
560 let std = self.std.clone();
561 asyncify(move || std.set_permissions(perm)).await
562 }
563
564 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
565 ///
566 /// Although Tokio uses a sensible default value for this buffer size, this function would be
567 /// useful for changing that default depending on the situation.
568 ///
569 /// # Examples
570 ///
571 /// ```no_run
572 /// use tokio::fs::File;
573 /// use tokio::io::AsyncWriteExt;
574 ///
575 /// # async fn dox() -> std::io::Result<()> {
576 /// let mut file = File::open("foo.txt").await?;
577 ///
578 /// // Set maximum buffer size to 8 MiB
579 /// file.set_max_buf_size(8 * 1024 * 1024);
580 ///
581 /// let mut buf = vec![1; 1024 * 1024 * 1024];
582 ///
583 /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
584 /// file.write_all(&mut buf).await?;
585 /// # Ok(())
586 /// # }
587 /// ```
588 pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
589 self.max_buf_size = max_buf_size;
590 }
591
592 /// Get the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
593 pub fn max_buf_size(&self) -> usize {
594 self.max_buf_size
595 }
596}
597
598impl AsyncRead for File {
599 fn poll_read(
600 self: Pin<&mut Self>,
601 cx: &mut Context<'_>,
602 dst: &mut ReadBuf<'_>,
603 ) -> Poll<io::Result<()>> {
604 ready!(crate::trace::trace_leaf(cx));
605
606 let me = self.get_mut();
607 let inner = me.inner.get_mut();
608
609 loop {
610 match inner.state {
611 State::Idle(ref mut buf_cell) => {
612 let mut buf = buf_cell.take().unwrap();
613
614 if !buf.is_empty() || dst.remaining() == 0 {
615 buf.copy_to(dst);
616 *buf_cell = Some(buf);
617 return Poll::Ready(Ok(()));
618 }
619
620 let std = me.std.clone();
621
622 let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
623 inner.state = State::Busy(spawn_blocking(move || {
624 // SAFETY: the `Read` implementation of `std` does not
625 // read from the buffer it is borrowing and correctly
626 // reports the length of the data written into the buffer.
627 let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
628 (Operation::Read(res), buf)
629 }));
630 }
631 State::Busy(ref mut rx) => {
632 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
633
634 match op {
635 Operation::Read(Ok(_)) => {
636 buf.copy_to(dst);
637 inner.state = State::Idle(Some(buf));
638 return Poll::Ready(Ok(()));
639 }
640 Operation::Read(Err(e)) => {
641 assert!(buf.is_empty());
642
643 inner.state = State::Idle(Some(buf));
644 return Poll::Ready(Err(e));
645 }
646 Operation::Write(Ok(())) => {
647 assert!(buf.is_empty());
648 inner.state = State::Idle(Some(buf));
649 continue;
650 }
651 Operation::Write(Err(e)) => {
652 assert!(inner.last_write_err.is_none());
653 inner.last_write_err = Some(e.kind());
654 inner.state = State::Idle(Some(buf));
655 }
656 Operation::Seek(result) => {
657 assert!(buf.is_empty());
658 inner.state = State::Idle(Some(buf));
659 if let Ok(pos) = result {
660 inner.pos = pos;
661 }
662 continue;
663 }
664 }
665 }
666 }
667 }
668 }
669}
670
671impl AsyncSeek for File {
672 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
673 let me = self.get_mut();
674 let inner = me.inner.get_mut();
675
676 match inner.state {
677 State::Busy(_) => Err(io::Error::new(
678 io::ErrorKind::Other,
679 "other file operation is pending, call poll_complete before start_seek",
680 )),
681 State::Idle(ref mut buf_cell) => {
682 let mut buf = buf_cell.take().unwrap();
683
684 // Factor in any unread data from the buf
685 if !buf.is_empty() {
686 let n = buf.discard_read();
687
688 if let SeekFrom::Current(ref mut offset) = pos {
689 *offset += n;
690 }
691 }
692
693 let std = me.std.clone();
694
695 inner.state = State::Busy(spawn_blocking(move || {
696 let res = (&*std).seek(pos);
697 (Operation::Seek(res), buf)
698 }));
699 Ok(())
700 }
701 }
702 }
703
704 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
705 ready!(crate::trace::trace_leaf(cx));
706 let inner = self.inner.get_mut();
707
708 loop {
709 match inner.state {
710 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
711 State::Busy(ref mut rx) => {
712 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
713 inner.state = State::Idle(Some(buf));
714
715 match op {
716 Operation::Read(_) => {}
717 Operation::Write(Err(e)) => {
718 assert!(inner.last_write_err.is_none());
719 inner.last_write_err = Some(e.kind());
720 }
721 Operation::Write(_) => {}
722 Operation::Seek(res) => {
723 if let Ok(pos) = res {
724 inner.pos = pos;
725 }
726 return Poll::Ready(res);
727 }
728 }
729 }
730 }
731 }
732 }
733}
734
735impl AsyncWrite for File {
736 fn poll_write(
737 self: Pin<&mut Self>,
738 cx: &mut Context<'_>,
739 src: &[u8],
740 ) -> Poll<io::Result<usize>> {
741 ready!(crate::trace::trace_leaf(cx));
742 let me = self.get_mut();
743 let inner = me.inner.get_mut();
744
745 if let Some(e) = inner.last_write_err.take() {
746 return Poll::Ready(Err(e.into()));
747 }
748
749 loop {
750 match inner.state {
751 State::Idle(ref mut buf_cell) => {
752 let mut buf = buf_cell.take().unwrap();
753
754 let seek = if !buf.is_empty() {
755 Some(SeekFrom::Current(buf.discard_read()))
756 } else {
757 None
758 };
759
760 let n = buf.copy_from(src, me.max_buf_size);
761 let std = me.std.clone();
762
763 #[allow(unused_mut)]
764 let mut task_join_handle = inner.poll_write_inner((std, buf), seek)?;
765
766 inner.state = State::Busy(task_join_handle);
767
768 return Poll::Ready(Ok(n));
769 }
770 State::Busy(ref mut rx) => {
771 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
772 inner.state = State::Idle(Some(buf));
773
774 match op {
775 Operation::Read(_) => {
776 // We don't care about the result here. The fact
777 // that the cursor has advanced will be reflected in
778 // the next iteration of the loop
779 continue;
780 }
781 Operation::Write(res) => {
782 // If the previous write was successful, continue.
783 // Otherwise, error.
784 res?;
785 continue;
786 }
787 Operation::Seek(_) => {
788 // Ignore the seek
789 continue;
790 }
791 }
792 }
793 }
794 }
795 }
796
797 fn poll_write_vectored(
798 self: Pin<&mut Self>,
799 cx: &mut Context<'_>,
800 bufs: &[io::IoSlice<'_>],
801 ) -> Poll<Result<usize, io::Error>> {
802 ready!(crate::trace::trace_leaf(cx));
803 let me = self.get_mut();
804 let inner = me.inner.get_mut();
805
806 if let Some(e) = inner.last_write_err.take() {
807 return Poll::Ready(Err(e.into()));
808 }
809
810 loop {
811 match inner.state {
812 State::Idle(ref mut buf_cell) => {
813 let mut buf = buf_cell.take().unwrap();
814
815 let seek = if !buf.is_empty() {
816 Some(SeekFrom::Current(buf.discard_read()))
817 } else {
818 None
819 };
820
821 let n = buf.copy_from_bufs(bufs, me.max_buf_size);
822 let std = me.std.clone();
823
824 #[allow(unused_mut)]
825 let mut data = Some((std, buf));
826
827 let mut task_join_handle: Option<JoinHandle<_>> = None;
828
829 #[cfg(all(
830 tokio_unstable,
831 feature = "io-uring",
832 feature = "rt",
833 feature = "fs",
834 target_os = "linux"
835 ))]
836 {
837 use crate::runtime::Handle;
838
839 // Handle not present in some tests?
840 if let Ok(handle) = Handle::try_current() {
841 if handle.inner.driver().io().check_and_init()? {
842 task_join_handle = {
843 use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op};
844
845 let (std, mut buf) = data.take().unwrap();
846 if let Some(seek) = seek {
847 // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset
848 // seeking only modifies kernel metadata and does not block, so we can do it here
849 (&*std).seek(seek).map_err(|e| {
850 io::Error::new(
851 e.kind(),
852 format!("failed to seek before write: {e}"),
853 )
854 })?;
855 }
856
857 let mut fd: ArcFd = std;
858 let handle = spawn(async move {
859 loop {
860 let op = Op::write_at(fd, buf, u64::MAX);
861 let (r, _buf, _fd) = op.await;
862 buf = _buf;
863 fd = _fd;
864 match r {
865 Ok(_) if buf.is_empty() => {
866 break (Operation::Write(Ok(())), buf);
867 }
868 Ok(0) => {
869 break (
870 Operation::Write(Err(
871 io::ErrorKind::WriteZero.into(),
872 )),
873 buf,
874 );
875 }
876 Ok(_) => continue, // more to write
877 Err(e) => break (Operation::Write(Err(e)), buf),
878 }
879 }
880 });
881
882 Some(handle)
883 };
884 }
885 }
886 }
887
888 if let Some((std, mut buf)) = data {
889 task_join_handle = Some(
890 spawn_mandatory_blocking(move || {
891 let res = if let Some(seek) = seek {
892 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
893 } else {
894 buf.write_to(&mut &*std)
895 };
896
897 (Operation::Write(res), buf)
898 })
899 .ok_or_else(|| {
900 io::Error::new(io::ErrorKind::Other, "background task failed")
901 })?,
902 );
903 }
904
905 inner.state = State::Busy(task_join_handle.unwrap());
906
907 return Poll::Ready(Ok(n));
908 }
909 State::Busy(ref mut rx) => {
910 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
911 inner.state = State::Idle(Some(buf));
912
913 match op {
914 Operation::Read(_) => {
915 // We don't care about the result here. The fact
916 // that the cursor has advanced will be reflected in
917 // the next iteration of the loop
918 continue;
919 }
920 Operation::Write(res) => {
921 // If the previous write was successful, continue.
922 // Otherwise, error.
923 res?;
924 continue;
925 }
926 Operation::Seek(_) => {
927 // Ignore the seek
928 continue;
929 }
930 }
931 }
932 }
933 }
934 }
935
936 fn is_write_vectored(&self) -> bool {
937 true
938 }
939
940 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
941 ready!(crate::trace::trace_leaf(cx));
942 let inner = self.inner.get_mut();
943 inner.poll_flush(cx)
944 }
945
946 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
947 ready!(crate::trace::trace_leaf(cx));
948 self.poll_flush(cx)
949 }
950}
951
952impl From<StdFile> for File {
953 fn from(std: StdFile) -> Self {
954 Self::from_std(std)
955 }
956}
957
958impl fmt::Debug for File {
959 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
960 fmt.debug_struct("tokio::fs::File")
961 .field("std", &self.std)
962 .finish()
963 }
964}
965
966#[cfg(unix)]
967impl std::os::unix::io::AsRawFd for File {
968 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
969 self.std.as_raw_fd()
970 }
971}
972
973#[cfg(unix)]
974impl std::os::unix::io::AsFd for File {
975 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
976 unsafe {
977 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
978 }
979 }
980}
981
982#[cfg(unix)]
983impl std::os::unix::io::FromRawFd for File {
984 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
985 // Safety: exactly the same safety contract as
986 // `std::os::unix::io::FromRawFd::from_raw_fd`.
987 unsafe { StdFile::from_raw_fd(fd).into() }
988 }
989}
990
991cfg_windows! {
992 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
993
994 impl AsRawHandle for File {
995 fn as_raw_handle(&self) -> RawHandle {
996 self.std.as_raw_handle()
997 }
998 }
999
1000 impl AsHandle for File {
1001 fn as_handle(&self) -> BorrowedHandle<'_> {
1002 unsafe {
1003 BorrowedHandle::borrow_raw(
1004 AsRawHandle::as_raw_handle(self),
1005 )
1006 }
1007 }
1008 }
1009
1010 impl FromRawHandle for File {
1011 unsafe fn from_raw_handle(handle: RawHandle) -> Self {
1012 // Safety: exactly the same safety contract as
1013 // `FromRawHandle::from_raw_handle`.
1014 unsafe { StdFile::from_raw_handle(handle).into() }
1015 }
1016 }
1017}
1018
1019impl Inner {
1020 async fn complete_inflight(&mut self) {
1021 use std::future::poll_fn;
1022
1023 poll_fn(|cx| self.poll_complete_inflight(cx)).await;
1024 }
1025
1026 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
1027 ready!(crate::trace::trace_leaf(cx));
1028 match self.poll_flush(cx) {
1029 Poll::Ready(Err(e)) => {
1030 self.last_write_err = Some(e.kind());
1031 Poll::Ready(())
1032 }
1033 Poll::Ready(Ok(())) => Poll::Ready(()),
1034 Poll::Pending => Poll::Pending,
1035 }
1036 }
1037
1038 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1039 if let Some(e) = self.last_write_err.take() {
1040 return Poll::Ready(Err(e.into()));
1041 }
1042
1043 let (op, buf) = match self.state {
1044 State::Idle(_) => return Poll::Ready(Ok(())),
1045 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
1046 };
1047
1048 // The buffer is not used here
1049 self.state = State::Idle(Some(buf));
1050
1051 match op {
1052 Operation::Read(_) => Poll::Ready(Ok(())),
1053 Operation::Write(res) => Poll::Ready(res),
1054 Operation::Seek(_) => Poll::Ready(Ok(())),
1055 }
1056 }
1057
1058 fn poll_write_inner(
1059 &self,
1060 data: (Arc<StdFile>, Buf),
1061 seek: Option<SeekFrom>,
1062 ) -> io::Result<JoinHandle<(Operation, Buf)>> {
1063 #[allow(unused_mut)]
1064 let mut data = Some(data);
1065 let mut task_join_handle = None;
1066
1067 #[cfg(all(
1068 tokio_unstable,
1069 feature = "io-uring",
1070 feature = "rt",
1071 feature = "fs",
1072 target_os = "linux"
1073 ))]
1074 {
1075 use crate::runtime::Handle;
1076
1077 // Handle not present in some tests?
1078 if let Ok(handle) = Handle::try_current() {
1079 if handle.inner.driver().io().check_and_init()? {
1080 task_join_handle = {
1081 use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op};
1082
1083 let (std, mut buf) = data.take().unwrap();
1084 if let Some(seek) = seek {
1085 // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset
1086 // seeking only modifies kernel metadata and does not block, so we can do it here
1087 (&*std).seek(seek).map_err(|e| {
1088 io::Error::new(
1089 e.kind(),
1090 format!("failed to seek before write: {e}"),
1091 )
1092 })?;
1093 }
1094
1095 let mut fd: ArcFd = std;
1096 let handle = spawn(async move {
1097 loop {
1098 let op = Op::write_at(fd, buf, u64::MAX);
1099 let (r, _buf, _fd) = op.await;
1100 buf = _buf;
1101 fd = _fd;
1102 match r {
1103 Ok(_) if buf.is_empty() => {
1104 break (Operation::Write(Ok(())), buf);
1105 }
1106 Ok(0) => {
1107 break (
1108 Operation::Write(Err(io::ErrorKind::WriteZero.into())),
1109 buf,
1110 );
1111 }
1112
1113 Ok(_) => continue, // more to write
1114 Err(e) => break (Operation::Write(Err(e)), buf),
1115 }
1116 }
1117 });
1118
1119 Some(handle)
1120 };
1121 }
1122 }
1123 }
1124
1125 if let Some((std, mut buf)) = data {
1126 task_join_handle = {
1127 let handle = spawn_mandatory_blocking(move || {
1128 let res = if let Some(seek) = seek {
1129 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
1130 } else {
1131 buf.write_to(&mut &*std)
1132 };
1133
1134 (Operation::Write(res), buf)
1135 })
1136 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "background task failed"))?;
1137
1138 Some(handle)
1139 };
1140 }
1141
1142 Ok(task_join_handle.unwrap())
1143 }
1144}
1145
1146#[cfg(test)]
1147mod tests;