Code: Select all
from concurrent.futures import ProcessPoolExecutor
import tarfile, rapidgzip
def processNdjson(ndjsonName):
with rapidgzip.open(inTarDir) as myZip:
myZip.import_index(rapidgzipDir)
with tarfile.open(fileobj=myZip, mode="r:*") as f:
member = f.getmember(ndjsonName)
dataFile = f.extractfile(member)
for oneLine in dataFile:
# process oneLine here
if __name__ == "__main__":
inTarDir = ...
rapidgzipDir = ...
nCore = 5
ndjsonNames = ["name1.ndjson", "name2.ndjson"]
with ProcessPoolExecutor(nCore) as pool:
results = pool.map(worker, ndjsonNames)
- ist das Verzeichnis einer .tar.gz-Datei, die mehrere .ndjson-Dateien enthält.
Code: Select all
inTarDir - ist die von rapidgzip zu verwendende Vorindexdatei. Dies ermöglicht einen schnellen Direktzugriff und ist ein direkter Ersatz für die in Python integrierte gzip.GzipFile.
Code: Select all
rapidgzipDir - Jeder Prozess wird
Code: Select all
with rapidgzip.open(inTarDir) as myZip:
myZip.import_index(rapidgzipDir)
with tarfile.open(fileobj=myZip, mode="r:*") as f:
Gibt es eine Möglichkeit, den mehrfachen Import desselben rapidgzipDir zu vermeiden?
Mobile version