Ich habe eine 3 GB große CSV-Datei mit Spalten für ['id','object_names ','file_paths','size','upload'] mit dtypes {'id':'int','object_names':'str','file_paths':'str','size':'int','upload':'bool'}.
Ich mache:
Code: Select all
zip_list = pd.read_csv(list_path).drop('upload',axis=1)
zip_list.object_names = zip_list.object_names.apply(ast.literal_eval)
zip_list.file_paths = zip_list.file_paths.apply(ast.literal_eval)
Dann:
Code: Select all
cmp_futures = []
for id in zip_list['id'].values:
cmp_futures.append(client.submit(
compare_zip_list,
zip_list[zip_list.id == id]['object_names'].values[0], # should be a 1-d list
id, # should be unique
current_objects))
Code: Select all
compare_zip_list
Dann mache ich:
Code: Select all
zip_list['upload'] = client.gather(cmp_futures)
Code: Select all
def compare_zip_list(object_names, id, current_objects):
dask.distributed.print(id) # or built-in print with flush=True
# code to check if the present object_list and id exists as a zip
Code: Select all
0
backup_0.zip already exists and file lists match - skipping.
0
backup_0.zip already exists and file lists match - skipping.
1
backup_1.zip already exists and file lists match - skipping.
2
backup_2.zip already exists and file lists match - skipping.
1
backup_1.zip already exists and file lists match - skipping.
2
backup_2.zip already exists and file lists match - skipping.
...
Fragen:< /p>
- Ist es möglich, dass die Funktion fehlschlägt, ohne einen Fehler auszulösen, und Dask es erneut versucht?
- Verwende ich client.submit falsch? Müssen Sie komprimieren und *args angeben?
- Etwas anderes?
- Ein weiterer möglicher Hinweis ist, dass dies derzeit sehr langsam ist - zip_list[zip_list.id == id]['object_names'].values[0] ist die langsamste oben gezeigte Zeile, und die Funktion enthält zwei Aufrufe von pandas.DataFrame.isin, die ausgeführt werden current_objects, die ich zuvor client.scatter, aber wenn ich wirklich jedem Worker eine Zeile des großen Datenrahmens gebe, verstehe ich nicht, warum es langsam ist.