tokio/fs/
file.rs

1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33cfg_io_uring! {
34    #[cfg(test)]
35    use super::mocks::spawn;
36    #[cfg(not(test))]
37    use crate::spawn;
38}
39
40/// A reference to an open file on the filesystem.
41///
42/// This is a specialized version of [`std::fs::File`] for usage from the
43/// Tokio runtime.
44///
45/// An instance of a `File` can be read and/or written depending on what options
46/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
47/// cursor that the file contains internally.
48///
49/// A file will not be closed immediately when it goes out of scope if there
50/// are any IO operations that have not yet completed. To ensure that a file is
51/// closed immediately when it is dropped, you should call [`flush`] before
52/// dropping it. Note that this does not ensure that the file has been fully
53/// written to disk; the operating system might keep the changes around in an
54/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
55/// the data to disk.
56///
57/// Reading and writing to a `File` is usually done using the convenience
58/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
59///
60/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
61/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
62/// [`sync_all`]: fn@crate::fs::File::sync_all
63/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
64/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
65///
66/// # Examples
67///
68/// Create a new file and asynchronously write bytes to it:
69///
70/// ```no_run
71/// use tokio::fs::File;
72/// use tokio::io::AsyncWriteExt; // for write_all()
73///
74/// # async fn dox() -> std::io::Result<()> {
75/// let mut file = File::create("foo.txt").await?;
76/// file.write_all(b"hello, world!").await?;
77/// # Ok(())
78/// # }
79/// ```
80///
81/// Read the contents of a file into a buffer:
82///
83/// ```no_run
84/// use tokio::fs::File;
85/// use tokio::io::AsyncReadExt; // for read_to_end()
86///
87/// # async fn dox() -> std::io::Result<()> {
88/// let mut file = File::open("foo.txt").await?;
89///
90/// let mut contents = vec![];
91/// file.read_to_end(&mut contents).await?;
92///
93/// println!("len = {}", contents.len());
94/// # Ok(())
95/// # }
96/// ```
97pub struct File {
98    std: Arc<StdFile>,
99    inner: Mutex<Inner>,
100    max_buf_size: usize,
101}
102
103struct Inner {
104    state: State,
105
106    /// Errors from writes/flushes are returned in write/flush calls. If a write
107    /// error is observed while performing a read, it is saved until the next
108    /// write / flush call.
109    last_write_err: Option<io::ErrorKind>,
110
111    pos: u64,
112}
113
114#[derive(Debug)]
115enum State {
116    Idle(Option<Buf>),
117    Busy(JoinHandle<(Operation, Buf)>),
118}
119
120#[derive(Debug)]
121enum Operation {
122    Read(io::Result<usize>),
123    Write(io::Result<()>),
124    Seek(io::Result<u64>),
125}
126
127impl File {
128    /// Attempts to open a file in read-only mode.
129    ///
130    /// See [`OpenOptions`] for more details.
131    ///
132    /// # Errors
133    ///
134    /// This function will return an error if called from outside of the Tokio
135    /// runtime or if path does not already exist. Other errors may also be
136    /// returned according to `OpenOptions::open`.
137    ///
138    /// # Examples
139    ///
140    /// ```no_run
141    /// use tokio::fs::File;
142    /// use tokio::io::AsyncReadExt;
143    ///
144    /// # async fn dox() -> std::io::Result<()> {
145    /// let mut file = File::open("foo.txt").await?;
146    ///
147    /// let mut contents = vec![];
148    /// file.read_to_end(&mut contents).await?;
149    ///
150    /// println!("len = {}", contents.len());
151    /// # Ok(())
152    /// # }
153    /// ```
154    ///
155    /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
156    ///
157    /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
158    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
159    pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
160        Self::options().read(true).open(path).await
161    }
162
163    /// Opens a file in write-only mode.
164    ///
165    /// This function will create a file if it does not exist, and will truncate
166    /// it if it does.
167    ///
168    /// See [`OpenOptions`] for more details.
169    ///
170    /// # Errors
171    ///
172    /// Results in an error if called from outside of the Tokio runtime or if
173    /// the underlying [`create`] call results in an error.
174    ///
175    /// [`create`]: std::fs::File::create
176    ///
177    /// # Examples
178    ///
179    /// ```no_run
180    /// use tokio::fs::File;
181    /// use tokio::io::AsyncWriteExt;
182    ///
183    /// # async fn dox() -> std::io::Result<()> {
184    /// let mut file = File::create("foo.txt").await?;
185    /// file.write_all(b"hello, world!").await?;
186    /// # Ok(())
187    /// # }
188    /// ```
189    ///
190    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
191    ///
192    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
193    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
194    pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
195        Self::options()
196            .write(true)
197            .create(true)
198            .truncate(true)
199            .open(path)
200            .await
201    }
202
203    /// Opens a file in read-write mode.
204    ///
205    /// This function will create a file if it does not exist, or return an error
206    /// if it does. This way, if the call succeeds, the file returned is guaranteed
207    /// to be new.
208    ///
209    /// This option is useful because it is atomic. Otherwise between checking
210    /// whether a file exists and creating a new one, the file may have been
211    /// created by another process (a TOCTOU race condition / attack).
212    ///
213    /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
214    ///
215    /// See [`OpenOptions`] for more details.
216    ///
217    /// # Examples
218    ///
219    /// ```no_run
220    /// use tokio::fs::File;
221    /// use tokio::io::AsyncWriteExt;
222    ///
223    /// # async fn dox() -> std::io::Result<()> {
224    /// let mut file = File::create_new("foo.txt").await?;
225    /// file.write_all(b"hello, world!").await?;
226    /// # Ok(())
227    /// # }
228    /// ```
229    ///
230    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
231    ///
232    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
233    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
234    pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
235        Self::options()
236            .read(true)
237            .write(true)
238            .create_new(true)
239            .open(path)
240            .await
241    }
242
243    /// Returns a new [`OpenOptions`] object.
244    ///
245    /// This function returns a new `OpenOptions` object that you can use to
246    /// open or create a file with specific options if `open()` or `create()`
247    /// are not appropriate.
248    ///
249    /// It is equivalent to `OpenOptions::new()`, but allows you to write more
250    /// readable code. Instead of
251    /// `OpenOptions::new().append(true).open("example.log")`,
252    /// you can write `File::options().append(true).open("example.log")`. This
253    /// also avoids the need to import `OpenOptions`.
254    ///
255    /// See the [`OpenOptions::new`] function for more details.
256    ///
257    /// # Examples
258    ///
259    /// ```no_run
260    /// use tokio::fs::File;
261    /// use tokio::io::AsyncWriteExt;
262    ///
263    /// # async fn dox() -> std::io::Result<()> {
264    /// let mut f = File::options().append(true).open("example.log").await?;
265    /// f.write_all(b"new line\n").await?;
266    /// # Ok(())
267    /// # }
268    /// ```
269    #[must_use]
270    pub fn options() -> OpenOptions {
271        OpenOptions::new()
272    }
273
274    /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
275    ///
276    /// # Examples
277    ///
278    /// ```no_run
279    /// // This line could block. It is not recommended to do this on the Tokio
280    /// // runtime.
281    /// let std_file = std::fs::File::open("foo.txt").unwrap();
282    /// let file = tokio::fs::File::from_std(std_file);
283    /// ```
284    pub fn from_std(std: StdFile) -> File {
285        File {
286            std: Arc::new(std),
287            inner: Mutex::new(Inner {
288                state: State::Idle(Some(Buf::with_capacity(0))),
289                last_write_err: None,
290                pos: 0,
291            }),
292            max_buf_size: DEFAULT_MAX_BUF_SIZE,
293        }
294    }
295
296    /// Attempts to sync all OS-internal metadata to disk.
297    ///
298    /// This function will attempt to ensure that all in-core data reaches the
299    /// filesystem before returning.
300    ///
301    /// # Examples
302    ///
303    /// ```no_run
304    /// use tokio::fs::File;
305    /// use tokio::io::AsyncWriteExt;
306    ///
307    /// # async fn dox() -> std::io::Result<()> {
308    /// let mut file = File::create("foo.txt").await?;
309    /// file.write_all(b"hello, world!").await?;
310    /// file.sync_all().await?;
311    /// # Ok(())
312    /// # }
313    /// ```
314    ///
315    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
316    ///
317    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
318    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
319    pub async fn sync_all(&self) -> io::Result<()> {
320        let mut inner = self.inner.lock().await;
321        inner.complete_inflight().await;
322
323        let std = self.std.clone();
324        asyncify(move || std.sync_all()).await
325    }
326
327    /// This function is similar to `sync_all`, except that it may not
328    /// synchronize file metadata to the filesystem.
329    ///
330    /// This is intended for use cases that must synchronize content, but don't
331    /// need the metadata on disk. The goal of this method is to reduce disk
332    /// operations.
333    ///
334    /// Note that some platforms may simply implement this in terms of `sync_all`.
335    ///
336    /// # Examples
337    ///
338    /// ```no_run
339    /// use tokio::fs::File;
340    /// use tokio::io::AsyncWriteExt;
341    ///
342    /// # async fn dox() -> std::io::Result<()> {
343    /// let mut file = File::create("foo.txt").await?;
344    /// file.write_all(b"hello, world!").await?;
345    /// file.sync_data().await?;
346    /// # Ok(())
347    /// # }
348    /// ```
349    ///
350    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
351    ///
352    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
353    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
354    pub async fn sync_data(&self) -> io::Result<()> {
355        let mut inner = self.inner.lock().await;
356        inner.complete_inflight().await;
357
358        let std = self.std.clone();
359        asyncify(move || std.sync_data()).await
360    }
361
362    /// Truncates or extends the underlying file, updating the size of this file to become size.
363    ///
364    /// If the size is less than the current file's size, then the file will be
365    /// shrunk. If it is greater than the current file's size, then the file
366    /// will be extended to size and have all of the intermediate data filled in
367    /// with 0s.
368    ///
369    /// # Errors
370    ///
371    /// This function will return an error if the file is not opened for
372    /// writing.
373    ///
374    /// # Examples
375    ///
376    /// ```no_run
377    /// use tokio::fs::File;
378    /// use tokio::io::AsyncWriteExt;
379    ///
380    /// # async fn dox() -> std::io::Result<()> {
381    /// let mut file = File::create("foo.txt").await?;
382    /// file.write_all(b"hello, world!").await?;
383    /// file.set_len(10).await?;
384    /// # Ok(())
385    /// # }
386    /// ```
387    ///
388    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
389    ///
390    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
391    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
392    pub async fn set_len(&self, size: u64) -> io::Result<()> {
393        let mut inner = self.inner.lock().await;
394        inner.complete_inflight().await;
395
396        let mut buf = match inner.state {
397            State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
398            _ => unreachable!(),
399        };
400
401        let seek = if !buf.is_empty() {
402            Some(SeekFrom::Current(buf.discard_read()))
403        } else {
404            None
405        };
406
407        let std = self.std.clone();
408
409        inner.state = State::Busy(spawn_blocking(move || {
410            let res = if let Some(seek) = seek {
411                (&*std).seek(seek).and_then(|_| std.set_len(size))
412            } else {
413                std.set_len(size)
414            }
415            .map(|()| 0); // the value is discarded later
416
417            // Return the result as a seek
418            (Operation::Seek(res), buf)
419        }));
420
421        let (op, buf) = match inner.state {
422            State::Idle(_) => unreachable!(),
423            State::Busy(ref mut rx) => rx.await?,
424        };
425
426        inner.state = State::Idle(Some(buf));
427
428        match op {
429            Operation::Seek(res) => res.map(|pos| {
430                inner.pos = pos;
431            }),
432            _ => unreachable!(),
433        }
434    }
435
436    /// Queries metadata about the underlying file.
437    ///
438    /// # Examples
439    ///
440    /// ```no_run
441    /// use tokio::fs::File;
442    ///
443    /// # async fn dox() -> std::io::Result<()> {
444    /// let file = File::open("foo.txt").await?;
445    /// let metadata = file.metadata().await?;
446    ///
447    /// println!("{:?}", metadata);
448    /// # Ok(())
449    /// # }
450    /// ```
451    pub async fn metadata(&self) -> io::Result<Metadata> {
452        let std = self.std.clone();
453        asyncify(move || std.metadata()).await
454    }
455
456    /// Creates a new `File` instance that shares the same underlying file handle
457    /// as the existing `File` instance. Reads, writes, and seeks will affect both
458    /// File instances simultaneously.
459    ///
460    /// # Examples
461    ///
462    /// ```no_run
463    /// use tokio::fs::File;
464    ///
465    /// # async fn dox() -> std::io::Result<()> {
466    /// let file = File::open("foo.txt").await?;
467    /// let file_clone = file.try_clone().await?;
468    /// # Ok(())
469    /// # }
470    /// ```
471    pub async fn try_clone(&self) -> io::Result<File> {
472        self.inner.lock().await.complete_inflight().await;
473        let std = self.std.clone();
474        let std_file = asyncify(move || std.try_clone()).await?;
475        let mut file = File::from_std(std_file);
476        file.set_max_buf_size(self.max_buf_size);
477        Ok(file)
478    }
479
480    /// Destructures `File` into a [`std::fs::File`]. This function is
481    /// async to allow any in-flight operations to complete.
482    ///
483    /// Use `File::try_into_std` to attempt conversion immediately.
484    ///
485    /// # Examples
486    ///
487    /// ```no_run
488    /// use tokio::fs::File;
489    ///
490    /// # async fn dox() -> std::io::Result<()> {
491    /// let tokio_file = File::open("foo.txt").await?;
492    /// let std_file = tokio_file.into_std().await;
493    /// # Ok(())
494    /// # }
495    /// ```
496    pub async fn into_std(mut self) -> StdFile {
497        self.inner.get_mut().complete_inflight().await;
498        Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
499    }
500
501    /// Tries to immediately destructure `File` into a [`std::fs::File`].
502    ///
503    /// # Errors
504    ///
505    /// This function will return an error containing the file if some
506    /// operation is in-flight.
507    ///
508    /// # Examples
509    ///
510    /// ```no_run
511    /// use tokio::fs::File;
512    ///
513    /// # async fn dox() -> std::io::Result<()> {
514    /// let tokio_file = File::open("foo.txt").await?;
515    /// let std_file = tokio_file.try_into_std().unwrap();
516    /// # Ok(())
517    /// # }
518    /// ```
519    #[allow(clippy::result_large_err)]
520    pub fn try_into_std(mut self) -> Result<StdFile, Self> {
521        match Arc::try_unwrap(self.std) {
522            Ok(file) => Ok(file),
523            Err(std_file_arc) => {
524                self.std = std_file_arc;
525                Err(self)
526            }
527        }
528    }
529
530    /// Changes the permissions on the underlying file.
531    ///
532    /// # Platform-specific behavior
533    ///
534    /// This function currently corresponds to the `fchmod` function on Unix and
535    /// the `SetFileInformationByHandle` function on Windows. Note that, this
536    /// [may change in the future][changes].
537    ///
538    /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
539    ///
540    /// # Errors
541    ///
542    /// This function will return an error if the user lacks permission change
543    /// attributes on the underlying file. It may also return an error in other
544    /// os-specific unspecified cases.
545    ///
546    /// # Examples
547    ///
548    /// ```no_run
549    /// use tokio::fs::File;
550    ///
551    /// # async fn dox() -> std::io::Result<()> {
552    /// let file = File::open("foo.txt").await?;
553    /// let mut perms = file.metadata().await?.permissions();
554    /// perms.set_readonly(true);
555    /// file.set_permissions(perms).await?;
556    /// # Ok(())
557    /// # }
558    /// ```
559    pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
560        let std = self.std.clone();
561        asyncify(move || std.set_permissions(perm)).await
562    }
563
564    /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
565    ///
566    /// Although Tokio uses a sensible default value for this buffer size, this function would be
567    /// useful for changing that default depending on the situation.
568    ///
569    /// # Examples
570    ///
571    /// ```no_run
572    /// use tokio::fs::File;
573    /// use tokio::io::AsyncWriteExt;
574    ///
575    /// # async fn dox() -> std::io::Result<()> {
576    /// let mut file = File::open("foo.txt").await?;
577    ///
578    /// // Set maximum buffer size to 8 MiB
579    /// file.set_max_buf_size(8 * 1024 * 1024);
580    ///
581    /// let mut buf = vec![1; 1024 * 1024 * 1024];
582    ///
583    /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
584    /// file.write_all(&mut buf).await?;
585    /// # Ok(())
586    /// # }
587    /// ```
588    pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
589        self.max_buf_size = max_buf_size;
590    }
591
592    /// Get the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
593    pub fn max_buf_size(&self) -> usize {
594        self.max_buf_size
595    }
596}
597
598impl AsyncRead for File {
599    fn poll_read(
600        self: Pin<&mut Self>,
601        cx: &mut Context<'_>,
602        dst: &mut ReadBuf<'_>,
603    ) -> Poll<io::Result<()>> {
604        ready!(crate::trace::trace_leaf(cx));
605
606        let me = self.get_mut();
607        let inner = me.inner.get_mut();
608
609        loop {
610            match inner.state {
611                State::Idle(ref mut buf_cell) => {
612                    let mut buf = buf_cell.take().unwrap();
613
614                    if !buf.is_empty() || dst.remaining() == 0 {
615                        buf.copy_to(dst);
616                        *buf_cell = Some(buf);
617                        return Poll::Ready(Ok(()));
618                    }
619
620                    let std = me.std.clone();
621
622                    let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
623                    inner.state = State::Busy(spawn_blocking(move || {
624                        // SAFETY: the `Read` implementation of `std` does not
625                        // read from the buffer it is borrowing and correctly
626                        // reports the length of the data written into the buffer.
627                        let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
628                        (Operation::Read(res), buf)
629                    }));
630                }
631                State::Busy(ref mut rx) => {
632                    let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
633
634                    match op {
635                        Operation::Read(Ok(_)) => {
636                            buf.copy_to(dst);
637                            inner.state = State::Idle(Some(buf));
638                            return Poll::Ready(Ok(()));
639                        }
640                        Operation::Read(Err(e)) => {
641                            assert!(buf.is_empty());
642
643                            inner.state = State::Idle(Some(buf));
644                            return Poll::Ready(Err(e));
645                        }
646                        Operation::Write(Ok(())) => {
647                            assert!(buf.is_empty());
648                            inner.state = State::Idle(Some(buf));
649                            continue;
650                        }
651                        Operation::Write(Err(e)) => {
652                            assert!(inner.last_write_err.is_none());
653                            inner.last_write_err = Some(e.kind());
654                            inner.state = State::Idle(Some(buf));
655                        }
656                        Operation::Seek(result) => {
657                            assert!(buf.is_empty());
658                            inner.state = State::Idle(Some(buf));
659                            if let Ok(pos) = result {
660                                inner.pos = pos;
661                            }
662                            continue;
663                        }
664                    }
665                }
666            }
667        }
668    }
669}
670
671impl AsyncSeek for File {
672    fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
673        let me = self.get_mut();
674        let inner = me.inner.get_mut();
675
676        match inner.state {
677            State::Busy(_) => Err(io::Error::new(
678                io::ErrorKind::Other,
679                "other file operation is pending, call poll_complete before start_seek",
680            )),
681            State::Idle(ref mut buf_cell) => {
682                let mut buf = buf_cell.take().unwrap();
683
684                // Factor in any unread data from the buf
685                if !buf.is_empty() {
686                    let n = buf.discard_read();
687
688                    if let SeekFrom::Current(ref mut offset) = pos {
689                        *offset += n;
690                    }
691                }
692
693                let std = me.std.clone();
694
695                inner.state = State::Busy(spawn_blocking(move || {
696                    let res = (&*std).seek(pos);
697                    (Operation::Seek(res), buf)
698                }));
699                Ok(())
700            }
701        }
702    }
703
704    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
705        ready!(crate::trace::trace_leaf(cx));
706        let inner = self.inner.get_mut();
707
708        loop {
709            match inner.state {
710                State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
711                State::Busy(ref mut rx) => {
712                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
713                    inner.state = State::Idle(Some(buf));
714
715                    match op {
716                        Operation::Read(_) => {}
717                        Operation::Write(Err(e)) => {
718                            assert!(inner.last_write_err.is_none());
719                            inner.last_write_err = Some(e.kind());
720                        }
721                        Operation::Write(_) => {}
722                        Operation::Seek(res) => {
723                            if let Ok(pos) = res {
724                                inner.pos = pos;
725                            }
726                            return Poll::Ready(res);
727                        }
728                    }
729                }
730            }
731        }
732    }
733}
734
735impl AsyncWrite for File {
736    fn poll_write(
737        self: Pin<&mut Self>,
738        cx: &mut Context<'_>,
739        src: &[u8],
740    ) -> Poll<io::Result<usize>> {
741        ready!(crate::trace::trace_leaf(cx));
742        let me = self.get_mut();
743        let inner = me.inner.get_mut();
744
745        if let Some(e) = inner.last_write_err.take() {
746            return Poll::Ready(Err(e.into()));
747        }
748
749        loop {
750            match inner.state {
751                State::Idle(ref mut buf_cell) => {
752                    let mut buf = buf_cell.take().unwrap();
753
754                    let seek = if !buf.is_empty() {
755                        Some(SeekFrom::Current(buf.discard_read()))
756                    } else {
757                        None
758                    };
759
760                    let n = buf.copy_from(src, me.max_buf_size);
761                    let std = me.std.clone();
762
763                    #[allow(unused_mut)]
764                    let mut task_join_handle = inner.poll_write_inner((std, buf), seek)?;
765
766                    inner.state = State::Busy(task_join_handle);
767
768                    return Poll::Ready(Ok(n));
769                }
770                State::Busy(ref mut rx) => {
771                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
772                    inner.state = State::Idle(Some(buf));
773
774                    match op {
775                        Operation::Read(_) => {
776                            // We don't care about the result here. The fact
777                            // that the cursor has advanced will be reflected in
778                            // the next iteration of the loop
779                            continue;
780                        }
781                        Operation::Write(res) => {
782                            // If the previous write was successful, continue.
783                            // Otherwise, error.
784                            res?;
785                            continue;
786                        }
787                        Operation::Seek(_) => {
788                            // Ignore the seek
789                            continue;
790                        }
791                    }
792                }
793            }
794        }
795    }
796
797    fn poll_write_vectored(
798        self: Pin<&mut Self>,
799        cx: &mut Context<'_>,
800        bufs: &[io::IoSlice<'_>],
801    ) -> Poll<Result<usize, io::Error>> {
802        ready!(crate::trace::trace_leaf(cx));
803        let me = self.get_mut();
804        let inner = me.inner.get_mut();
805
806        if let Some(e) = inner.last_write_err.take() {
807            return Poll::Ready(Err(e.into()));
808        }
809
810        loop {
811            match inner.state {
812                State::Idle(ref mut buf_cell) => {
813                    let mut buf = buf_cell.take().unwrap();
814
815                    let seek = if !buf.is_empty() {
816                        Some(SeekFrom::Current(buf.discard_read()))
817                    } else {
818                        None
819                    };
820
821                    let n = buf.copy_from_bufs(bufs, me.max_buf_size);
822                    let std = me.std.clone();
823
824                    #[allow(unused_mut)]
825                    let mut data = Some((std, buf));
826
827                    let mut task_join_handle: Option<JoinHandle<_>> = None;
828
829                    #[cfg(all(
830                        tokio_unstable,
831                        feature = "io-uring",
832                        feature = "rt",
833                        feature = "fs",
834                        target_os = "linux"
835                    ))]
836                    {
837                        use crate::runtime::Handle;
838
839                        // Handle not present in some tests?
840                        if let Ok(handle) = Handle::try_current() {
841                            if handle.inner.driver().io().check_and_init()? {
842                                task_join_handle = {
843                                    use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op};
844
845                                    let (std, mut buf) = data.take().unwrap();
846                                    if let Some(seek) = seek {
847                                        // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset
848                                        // seeking only modifies kernel metadata and does not block, so we can do it here
849                                        (&*std).seek(seek).map_err(|e| {
850                                            io::Error::new(
851                                                e.kind(),
852                                                format!("failed to seek before write: {e}"),
853                                            )
854                                        })?;
855                                    }
856
857                                    let mut fd: ArcFd = std;
858                                    let handle = spawn(async move {
859                                        loop {
860                                            let op = Op::write_at(fd, buf, u64::MAX);
861                                            let (r, _buf, _fd) = op.await;
862                                            buf = _buf;
863                                            fd = _fd;
864                                            match r {
865                                                Ok(_) if buf.is_empty() => {
866                                                    break (Operation::Write(Ok(())), buf);
867                                                }
868                                                Ok(0) => {
869                                                    break (
870                                                        Operation::Write(Err(
871                                                            io::ErrorKind::WriteZero.into(),
872                                                        )),
873                                                        buf,
874                                                    );
875                                                }
876                                                Ok(_) => continue, // more to write
877                                                Err(e) => break (Operation::Write(Err(e)), buf),
878                                            }
879                                        }
880                                    });
881
882                                    Some(handle)
883                                };
884                            }
885                        }
886                    }
887
888                    if let Some((std, mut buf)) = data {
889                        task_join_handle = Some(
890                            spawn_mandatory_blocking(move || {
891                                let res = if let Some(seek) = seek {
892                                    (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
893                                } else {
894                                    buf.write_to(&mut &*std)
895                                };
896
897                                (Operation::Write(res), buf)
898                            })
899                            .ok_or_else(|| {
900                                io::Error::new(io::ErrorKind::Other, "background task failed")
901                            })?,
902                        );
903                    }
904
905                    inner.state = State::Busy(task_join_handle.unwrap());
906
907                    return Poll::Ready(Ok(n));
908                }
909                State::Busy(ref mut rx) => {
910                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
911                    inner.state = State::Idle(Some(buf));
912
913                    match op {
914                        Operation::Read(_) => {
915                            // We don't care about the result here. The fact
916                            // that the cursor has advanced will be reflected in
917                            // the next iteration of the loop
918                            continue;
919                        }
920                        Operation::Write(res) => {
921                            // If the previous write was successful, continue.
922                            // Otherwise, error.
923                            res?;
924                            continue;
925                        }
926                        Operation::Seek(_) => {
927                            // Ignore the seek
928                            continue;
929                        }
930                    }
931                }
932            }
933        }
934    }
935
936    fn is_write_vectored(&self) -> bool {
937        true
938    }
939
940    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
941        ready!(crate::trace::trace_leaf(cx));
942        let inner = self.inner.get_mut();
943        inner.poll_flush(cx)
944    }
945
946    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
947        ready!(crate::trace::trace_leaf(cx));
948        self.poll_flush(cx)
949    }
950}
951
952impl From<StdFile> for File {
953    fn from(std: StdFile) -> Self {
954        Self::from_std(std)
955    }
956}
957
958impl fmt::Debug for File {
959    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
960        fmt.debug_struct("tokio::fs::File")
961            .field("std", &self.std)
962            .finish()
963    }
964}
965
966#[cfg(unix)]
967impl std::os::unix::io::AsRawFd for File {
968    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
969        self.std.as_raw_fd()
970    }
971}
972
973#[cfg(unix)]
974impl std::os::unix::io::AsFd for File {
975    fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
976        unsafe {
977            std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
978        }
979    }
980}
981
982#[cfg(unix)]
983impl std::os::unix::io::FromRawFd for File {
984    unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
985        // Safety: exactly the same safety contract as
986        // `std::os::unix::io::FromRawFd::from_raw_fd`.
987        unsafe { StdFile::from_raw_fd(fd).into() }
988    }
989}
990
991cfg_windows! {
992    use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
993
994    impl AsRawHandle for File {
995        fn as_raw_handle(&self) -> RawHandle {
996            self.std.as_raw_handle()
997        }
998    }
999
1000    impl AsHandle for File {
1001        fn as_handle(&self) -> BorrowedHandle<'_> {
1002            unsafe {
1003                BorrowedHandle::borrow_raw(
1004                    AsRawHandle::as_raw_handle(self),
1005                )
1006            }
1007        }
1008    }
1009
1010    impl FromRawHandle for File {
1011        unsafe fn from_raw_handle(handle: RawHandle) -> Self {
1012            // Safety: exactly the same safety contract as
1013            // `FromRawHandle::from_raw_handle`.
1014            unsafe { StdFile::from_raw_handle(handle).into() }
1015        }
1016    }
1017}
1018
1019impl Inner {
1020    async fn complete_inflight(&mut self) {
1021        use std::future::poll_fn;
1022
1023        poll_fn(|cx| self.poll_complete_inflight(cx)).await;
1024    }
1025
1026    fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
1027        ready!(crate::trace::trace_leaf(cx));
1028        match self.poll_flush(cx) {
1029            Poll::Ready(Err(e)) => {
1030                self.last_write_err = Some(e.kind());
1031                Poll::Ready(())
1032            }
1033            Poll::Ready(Ok(())) => Poll::Ready(()),
1034            Poll::Pending => Poll::Pending,
1035        }
1036    }
1037
1038    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1039        if let Some(e) = self.last_write_err.take() {
1040            return Poll::Ready(Err(e.into()));
1041        }
1042
1043        let (op, buf) = match self.state {
1044            State::Idle(_) => return Poll::Ready(Ok(())),
1045            State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
1046        };
1047
1048        // The buffer is not used here
1049        self.state = State::Idle(Some(buf));
1050
1051        match op {
1052            Operation::Read(_) => Poll::Ready(Ok(())),
1053            Operation::Write(res) => Poll::Ready(res),
1054            Operation::Seek(_) => Poll::Ready(Ok(())),
1055        }
1056    }
1057
1058    fn poll_write_inner(
1059        &self,
1060        data: (Arc<StdFile>, Buf),
1061        seek: Option<SeekFrom>,
1062    ) -> io::Result<JoinHandle<(Operation, Buf)>> {
1063        #[allow(unused_mut)]
1064        let mut data = Some(data);
1065        let mut task_join_handle = None;
1066
1067        #[cfg(all(
1068            tokio_unstable,
1069            feature = "io-uring",
1070            feature = "rt",
1071            feature = "fs",
1072            target_os = "linux"
1073        ))]
1074        {
1075            use crate::runtime::Handle;
1076
1077            // Handle not present in some tests?
1078            if let Ok(handle) = Handle::try_current() {
1079                if handle.inner.driver().io().check_and_init()? {
1080                    task_join_handle = {
1081                        use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op};
1082
1083                        let (std, mut buf) = data.take().unwrap();
1084                        if let Some(seek) = seek {
1085                            // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset
1086                            // seeking only modifies kernel metadata and does not block, so we can do it here
1087                            (&*std).seek(seek).map_err(|e| {
1088                                io::Error::new(
1089                                    e.kind(),
1090                                    format!("failed to seek before write: {e}"),
1091                                )
1092                            })?;
1093                        }
1094
1095                        let mut fd: ArcFd = std;
1096                        let handle = spawn(async move {
1097                            loop {
1098                                let op = Op::write_at(fd, buf, u64::MAX);
1099                                let (r, _buf, _fd) = op.await;
1100                                buf = _buf;
1101                                fd = _fd;
1102                                match r {
1103                                    Ok(_) if buf.is_empty() => {
1104                                        break (Operation::Write(Ok(())), buf);
1105                                    }
1106                                    Ok(0) => {
1107                                        break (
1108                                            Operation::Write(Err(io::ErrorKind::WriteZero.into())),
1109                                            buf,
1110                                        );
1111                                    }
1112
1113                                    Ok(_) => continue, // more to write
1114                                    Err(e) => break (Operation::Write(Err(e)), buf),
1115                                }
1116                            }
1117                        });
1118
1119                        Some(handle)
1120                    };
1121                }
1122            }
1123        }
1124
1125        if let Some((std, mut buf)) = data {
1126            task_join_handle = {
1127                let handle = spawn_mandatory_blocking(move || {
1128                    let res = if let Some(seek) = seek {
1129                        (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
1130                    } else {
1131                        buf.write_to(&mut &*std)
1132                    };
1133
1134                    (Operation::Write(res), buf)
1135                })
1136                .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "background task failed"))?;
1137
1138                Some(handle)
1139            };
1140        }
1141
1142        Ok(task_join_handle.unwrap())
1143    }
1144}
1145
1146#[cfg(test)]
1147mod tests;