-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy path_file_operations.py
More file actions
225 lines (203 loc) · 8.49 KB
/
_file_operations.py
File metadata and controls
225 lines (203 loc) · 8.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""File operations helpers."""
from __future__ import annotations
from typing import Optional
class _ODataFileOperations:
"""Provides file management capabilities including upload, download, and delete operations."""
def _upload_file(
self,
entity_set: str,
record_id: str,
file_name_attribute: str,
path: str,
mode: Optional[str] = None,
mime_type: Optional[str] = None,
if_none_match: bool = True,
) -> None:
"""Upload a file to a Dataverse file column with automatic method selection.
Parameters
----------
entity_set : :class:`str`
Target entity set (plural logical name), e.g. "accounts".
record_id : :class:`str`
GUID of the target record.
file_name_attribute : :class:`str`
Logical name of the file column attribute
path : :class:`str`
Local filesystem path to the file.
mode : :class:`str` | None
Upload strategy: "auto" (default), "small", or "chunk".
mime_type : :class:`str` | None
Explicit MIME type. If omitted falls back to application/octet-stream.
if_none_match : :class:`bool`
When True (default) only succeeds if column empty. When False overwrites (If-Match: *).
"""
import os
mode = (mode or "auto").lower()
if mode == "auto":
if not os.path.isfile(path):
raise FileNotFoundError(f"File not found: {path}")
size = os.path.getsize(path)
mode = "small" if size < 128 * 1024 * 1024 else "chunk"
if mode == "small":
return self._upload_file_small(
entity_set, record_id, file_name_attribute, path, content_type=mime_type, if_none_match=if_none_match
)
if mode == "chunk":
return self._upload_file_chunk(
entity_set, record_id, file_name_attribute, path, if_none_match=if_none_match
)
raise ValueError(f"Invalid mode '{mode}'. Use 'auto', 'small', or 'chunk'.")
def _upload_file_small(
self,
entity_set: str,
record_id: str,
file_name_attribute: str,
path: str,
content_type: Optional[str] = None,
if_none_match: bool = True,
) -> None:
"""Upload a file (<128MB) via single PATCH."""
import os
if not record_id:
raise ValueError("record_id required")
if not os.path.isfile(path):
raise FileNotFoundError(f"File not found: {path}")
size = os.path.getsize(path)
limit = 128 * 1024 * 1024
if size > limit:
raise ValueError(f"File size {size} exceeds single-upload limit {limit}; use chunk mode.")
with open(path, "rb") as fh:
data = fh.read()
fname = os.path.basename(path)
key = self._format_key(record_id)
url = f"{self.api}/{entity_set}{key}/{file_name_attribute}"
headers = {
"Content-Type": content_type or "application/octet-stream",
"x-ms-file-name": fname,
}
if if_none_match:
headers["If-None-Match"] = "null"
else:
headers["If-Match"] = "*"
# Single PATCH upload; allow default success codes (includes 204)
self._request("patch", url, headers=headers, data=data)
return None
def _upload_file_chunk(
self,
entity_set: str,
record_id: str,
file_name_attribute: str,
path: str,
if_none_match: bool = True,
) -> None:
"""Stream a local file using Dataverse native chunked PATCH protocol.
1. Initial PATCH with header x-ms-transfer-mode: chunked (empty body) to start session.
2. Subsequent PATCH calls to Location URL including sessiontoken with binary body segments and headers. Returns 206 for partial chunks and 204 on final.
Parameters
----------
entity_set : :class:`str`
Target entity set (plural logical name), e.g. "accounts".
record_id : :class:`str`
GUID of the target record.
file_name_attribute : :class:`str`
Logical name of the file column attribute.
path : :class:`str`
Local filesystem path to the file.
if_none_match : :class:`bool`
When True sends ``If-None-Match: null`` to only succeed if the column is currently empty.
Set False to always overwrite (uses ``If-Match: *``).
Returns
-------
None
Returns nothing on success. Any failure raises an exception.
"""
import os
import math
from urllib.parse import quote
if not record_id:
raise ValueError("record_id required")
if not os.path.isfile(path):
raise FileNotFoundError(f"File not found: {path}")
total_size = os.path.getsize(path)
fname = os.path.basename(path)
key = self._format_key(record_id)
init_url = f"{self.api}/{entity_set}{key}/{file_name_attribute}?x-ms-file-name={quote(fname)}"
headers = {
"x-ms-transfer-mode": "chunked",
}
if if_none_match:
headers["If-None-Match"] = "null"
else:
headers["If-Match"] = "*"
r_init = self._request("patch", init_url, headers=headers, data=b"")
location = r_init.headers.get("Location") or r_init.headers.get("location")
if not location:
raise RuntimeError("Missing Location header with sessiontoken for chunked upload")
rec_hdr = r_init.headers.get("x-ms-chunk-size") or r_init.headers.get("X-MS-CHUNK-SIZE")
try:
recommended_size = int(rec_hdr) if rec_hdr else None
except Exception: # noqa: BLE001
recommended_size = None
effective_size = recommended_size or (4 * 1024 * 1024)
if effective_size <= 0:
raise ValueError("effective chunk size must be positive")
total_chunks = int(math.ceil(total_size / effective_size)) if total_size else 1
uploaded_bytes = 0
with open(path, "rb") as fh:
for idx in range(total_chunks):
chunk = fh.read(effective_size)
if not chunk:
break
start = uploaded_bytes
end = start + len(chunk) - 1
c_headers = {
"x-ms-file-name": fname,
"Content-Type": "application/octet-stream",
"Content-Range": f"bytes {start}-{end}/{total_size}",
"Content-Length": str(len(chunk)),
}
# Each chunk returns 206 (partial) or 204 (final). Accept both.
self._request("patch", location, headers=c_headers, data=chunk, expected=(206, 204))
uploaded_bytes += len(chunk)
return None
def _download_file(
self,
entity_set: str,
record_id: str,
file_name_attribute: str,
) -> tuple[str, bytes]:
"""
Download a file from a Dataverse file column.
:param entity_set: Source entity set (plural logical name), e.g. "accounts".
:param record_id: GUID of the record
:param file_name_attribute: Logical name of the file column attribute.
:return: Tuple with file name and file content.
"""
key = self._format_key(record_id)
url = f"{self.api}/{entity_set}{key}/{file_name_attribute}/$value"
response = self._request("get", url, expected=(200,))
file_name = response.headers.get("x-ms-file-name")
if file_name is None:
raise ValueError(
"Response is missing the 'x-ms-file-name' header. The file column may be empty or the server did not return the expected header."
)
return file_name, response.content
def _delete_file(
self,
entity_set: str,
record_id: str,
file_name_attribute: str,
) -> None:
"""
Delete a file from a Dataverse file column.
:param entity_set: Target entity set (plural logical name), e.g. "accounts".
:param record_id: GUID of the record
:param file_name_attribute: Logical name of the file column attribute.
:return: None
"""
key = self._format_key(record_id)
url = f"{self.api}/{entity_set}{key}/{file_name_attribute}"
self._request("delete", url, expected=(204,))
return None