How do I improve loading thousands of tiny JSON files into a Spark dataframe?
up vote
0
down vote
favorite
I have about 30,000 very tiny JSON files that I am attempting to load into a Spark dataframe (from a mounted S3 bucket). It is reported here and here that there may be performance issues and is described as the Hadoop Small Files Problem
. Unlike what has been previously reported, I am not recursing into directories (as all my JSON files are in one sub-folder). My code to load the JSON files look like the following.
val df = spark
.read
.option("multiline", "true")
.json("/mnt/mybucket/myfolder/*.json")
.cache
So far, my job seems "stuck". I see 2 stages.
- Job 0, Stage 0: Listing leaf files and directories
- Job 1, Stage 1: val df = spark .read .option("multiline", "...
Job 0, Stage 0
is quite fast, less than 1 minute. Job 1, Stage 1
, however, takes forever to even show up (lost track of time, but between the two, we are talking 20+ minutes), and when it does show up on the jobs UI, it seems to be "stuck" (I am still waiting on any progress to be reported after 15+ minutes). Interestingly, Job 0, Stage 0
has 200 tasks (I see 7 executors being used), and Job 1, Stage 1
has only 1 task (seems like only 1 node/executor is being used! what a waste!).
Is there any way to make this seemingly simple step of loading 30,000 files faster or more performant?
Something that I thought about was to simply "merge" these files into large ones; for example, merge 1,000 JSON files into 30 bigger ones (using NDJSON). However, I am skeptical of this approach since merging the files (let's say using Python) might itself take a long time (something like the native linux ls
command in this directory takes an awful long time to return); also, this approach might defeat the purpose of cluster computing end-to-end (not very elegant).
json apache-spark
add a comment |
up vote
0
down vote
favorite
I have about 30,000 very tiny JSON files that I am attempting to load into a Spark dataframe (from a mounted S3 bucket). It is reported here and here that there may be performance issues and is described as the Hadoop Small Files Problem
. Unlike what has been previously reported, I am not recursing into directories (as all my JSON files are in one sub-folder). My code to load the JSON files look like the following.
val df = spark
.read
.option("multiline", "true")
.json("/mnt/mybucket/myfolder/*.json")
.cache
So far, my job seems "stuck". I see 2 stages.
- Job 0, Stage 0: Listing leaf files and directories
- Job 1, Stage 1: val df = spark .read .option("multiline", "...
Job 0, Stage 0
is quite fast, less than 1 minute. Job 1, Stage 1
, however, takes forever to even show up (lost track of time, but between the two, we are talking 20+ minutes), and when it does show up on the jobs UI, it seems to be "stuck" (I am still waiting on any progress to be reported after 15+ minutes). Interestingly, Job 0, Stage 0
has 200 tasks (I see 7 executors being used), and Job 1, Stage 1
has only 1 task (seems like only 1 node/executor is being used! what a waste!).
Is there any way to make this seemingly simple step of loading 30,000 files faster or more performant?
Something that I thought about was to simply "merge" these files into large ones; for example, merge 1,000 JSON files into 30 bigger ones (using NDJSON). However, I am skeptical of this approach since merging the files (let's say using Python) might itself take a long time (something like the native linux ls
command in this directory takes an awful long time to return); also, this approach might defeat the purpose of cluster computing end-to-end (not very elegant).
json apache-spark
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have about 30,000 very tiny JSON files that I am attempting to load into a Spark dataframe (from a mounted S3 bucket). It is reported here and here that there may be performance issues and is described as the Hadoop Small Files Problem
. Unlike what has been previously reported, I am not recursing into directories (as all my JSON files are in one sub-folder). My code to load the JSON files look like the following.
val df = spark
.read
.option("multiline", "true")
.json("/mnt/mybucket/myfolder/*.json")
.cache
So far, my job seems "stuck". I see 2 stages.
- Job 0, Stage 0: Listing leaf files and directories
- Job 1, Stage 1: val df = spark .read .option("multiline", "...
Job 0, Stage 0
is quite fast, less than 1 minute. Job 1, Stage 1
, however, takes forever to even show up (lost track of time, but between the two, we are talking 20+ minutes), and when it does show up on the jobs UI, it seems to be "stuck" (I am still waiting on any progress to be reported after 15+ minutes). Interestingly, Job 0, Stage 0
has 200 tasks (I see 7 executors being used), and Job 1, Stage 1
has only 1 task (seems like only 1 node/executor is being used! what a waste!).
Is there any way to make this seemingly simple step of loading 30,000 files faster or more performant?
Something that I thought about was to simply "merge" these files into large ones; for example, merge 1,000 JSON files into 30 bigger ones (using NDJSON). However, I am skeptical of this approach since merging the files (let's say using Python) might itself take a long time (something like the native linux ls
command in this directory takes an awful long time to return); also, this approach might defeat the purpose of cluster computing end-to-end (not very elegant).
json apache-spark
I have about 30,000 very tiny JSON files that I am attempting to load into a Spark dataframe (from a mounted S3 bucket). It is reported here and here that there may be performance issues and is described as the Hadoop Small Files Problem
. Unlike what has been previously reported, I am not recursing into directories (as all my JSON files are in one sub-folder). My code to load the JSON files look like the following.
val df = spark
.read
.option("multiline", "true")
.json("/mnt/mybucket/myfolder/*.json")
.cache
So far, my job seems "stuck". I see 2 stages.
- Job 0, Stage 0: Listing leaf files and directories
- Job 1, Stage 1: val df = spark .read .option("multiline", "...
Job 0, Stage 0
is quite fast, less than 1 minute. Job 1, Stage 1
, however, takes forever to even show up (lost track of time, but between the two, we are talking 20+ minutes), and when it does show up on the jobs UI, it seems to be "stuck" (I am still waiting on any progress to be reported after 15+ minutes). Interestingly, Job 0, Stage 0
has 200 tasks (I see 7 executors being used), and Job 1, Stage 1
has only 1 task (seems like only 1 node/executor is being used! what a waste!).
Is there any way to make this seemingly simple step of loading 30,000 files faster or more performant?
Something that I thought about was to simply "merge" these files into large ones; for example, merge 1,000 JSON files into 30 bigger ones (using NDJSON). However, I am skeptical of this approach since merging the files (let's say using Python) might itself take a long time (something like the native linux ls
command in this directory takes an awful long time to return); also, this approach might defeat the purpose of cluster computing end-to-end (not very elegant).
json apache-spark
json apache-spark
edited Nov 8 at 4:51
cricket_007
75.8k1042106
75.8k1042106
asked Nov 8 at 4:39
Jane Wayne
2,65253164
2,65253164
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
up vote
2
down vote
Merging JSON files into newline delimited, much larger (aim for one or at most 10 files, not 30) files would be the only option here.
Python opening 30K files isn't going to be any slower than what you're already doing, it just won't be distributed.
Besides that, multiline=true
was particularly only added for the cases where you already have a really large JSON file and it's one top level array or object that's being stored. Before that option existed, "JSONLines" is the only format Spark could read.
The most consistent solution here would be to fix the ingestion pipeline that's writing all these files such that you can accumulate records ahead of time, then dump larger batches. Or just use Kafka rather than reading data from S3 (or any similar filesystem)
I don't have control over the creation of the JSON files.
– Jane Wayne
Nov 8 at 4:56
1
Well, that's unfortunate. Requesting a bunch of files at once from S3 in general means one round trip HTTP request for every file. As you can imagine, that's the bottleneck here, not Spark, and all other tools will have the same problem
– cricket_007
Nov 8 at 4:58
Your feedback is making me think that Spark or Hadoop-esque systems may suffer without data locality. I have 30,000 wave (audio) files corresponding to those JSON files and I was thinking of using Spark for audio processing as well. Fundamentally, I suppose the same issues will manifest as well? With JSON at least there's NDJSON, but what about binary/wav files? Any tips on that? The wave files are 5-10 MB each.
– Jane Wayne
Nov 8 at 6:13
I guess I can create my own Parquet file locally and upload that to S3.
– Jane Wayne
Nov 8 at 6:18
I don't know how Audio files make sense in a columnar format with specific types, but I would suggest Bzip2 format archives
– cricket_007
Nov 8 at 15:25
|
show 2 more comments
up vote
0
down vote
There's two HTTP requests a read, one HEAD, one GET; if the files are all kept in the same dir then the listing cost is simply one LIST/5000 objects, so 6 list calls. You'll pay ~$25 for 30K HEAD & GET calls.
If you are using spark to take the listing and generate a record from each individual file, as well as the overhead of scheduling a task per file. You can do a trick where you make the listing itself (which you do in .py) which becomes the input RDD (i.e one row-per-file) and the map() becomes the read of that file and the output of the map the record representing the single file. scala example. This addresses the spark scheduling overhead as that input listing will be split into bigger parts pushed out to the workers, so leaving only those HTTP HEAD/GET calls.
For this to work efficiently, use Hadoop 2.8+ Jars, and do the listing using FileSystem.listFiles(Path, true)
to a single recursive listing of the entire directory tree under the path, so using the S3 LIST API at its most optimal.
(Once you've done this, why not post the code up somewhere for others?)
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
2
down vote
Merging JSON files into newline delimited, much larger (aim for one or at most 10 files, not 30) files would be the only option here.
Python opening 30K files isn't going to be any slower than what you're already doing, it just won't be distributed.
Besides that, multiline=true
was particularly only added for the cases where you already have a really large JSON file and it's one top level array or object that's being stored. Before that option existed, "JSONLines" is the only format Spark could read.
The most consistent solution here would be to fix the ingestion pipeline that's writing all these files such that you can accumulate records ahead of time, then dump larger batches. Or just use Kafka rather than reading data from S3 (or any similar filesystem)
I don't have control over the creation of the JSON files.
– Jane Wayne
Nov 8 at 4:56
1
Well, that's unfortunate. Requesting a bunch of files at once from S3 in general means one round trip HTTP request for every file. As you can imagine, that's the bottleneck here, not Spark, and all other tools will have the same problem
– cricket_007
Nov 8 at 4:58
Your feedback is making me think that Spark or Hadoop-esque systems may suffer without data locality. I have 30,000 wave (audio) files corresponding to those JSON files and I was thinking of using Spark for audio processing as well. Fundamentally, I suppose the same issues will manifest as well? With JSON at least there's NDJSON, but what about binary/wav files? Any tips on that? The wave files are 5-10 MB each.
– Jane Wayne
Nov 8 at 6:13
I guess I can create my own Parquet file locally and upload that to S3.
– Jane Wayne
Nov 8 at 6:18
I don't know how Audio files make sense in a columnar format with specific types, but I would suggest Bzip2 format archives
– cricket_007
Nov 8 at 15:25
|
show 2 more comments
up vote
2
down vote
Merging JSON files into newline delimited, much larger (aim for one or at most 10 files, not 30) files would be the only option here.
Python opening 30K files isn't going to be any slower than what you're already doing, it just won't be distributed.
Besides that, multiline=true
was particularly only added for the cases where you already have a really large JSON file and it's one top level array or object that's being stored. Before that option existed, "JSONLines" is the only format Spark could read.
The most consistent solution here would be to fix the ingestion pipeline that's writing all these files such that you can accumulate records ahead of time, then dump larger batches. Or just use Kafka rather than reading data from S3 (or any similar filesystem)
I don't have control over the creation of the JSON files.
– Jane Wayne
Nov 8 at 4:56
1
Well, that's unfortunate. Requesting a bunch of files at once from S3 in general means one round trip HTTP request for every file. As you can imagine, that's the bottleneck here, not Spark, and all other tools will have the same problem
– cricket_007
Nov 8 at 4:58
Your feedback is making me think that Spark or Hadoop-esque systems may suffer without data locality. I have 30,000 wave (audio) files corresponding to those JSON files and I was thinking of using Spark for audio processing as well. Fundamentally, I suppose the same issues will manifest as well? With JSON at least there's NDJSON, but what about binary/wav files? Any tips on that? The wave files are 5-10 MB each.
– Jane Wayne
Nov 8 at 6:13
I guess I can create my own Parquet file locally and upload that to S3.
– Jane Wayne
Nov 8 at 6:18
I don't know how Audio files make sense in a columnar format with specific types, but I would suggest Bzip2 format archives
– cricket_007
Nov 8 at 15:25
|
show 2 more comments
up vote
2
down vote
up vote
2
down vote
Merging JSON files into newline delimited, much larger (aim for one or at most 10 files, not 30) files would be the only option here.
Python opening 30K files isn't going to be any slower than what you're already doing, it just won't be distributed.
Besides that, multiline=true
was particularly only added for the cases where you already have a really large JSON file and it's one top level array or object that's being stored. Before that option existed, "JSONLines" is the only format Spark could read.
The most consistent solution here would be to fix the ingestion pipeline that's writing all these files such that you can accumulate records ahead of time, then dump larger batches. Or just use Kafka rather than reading data from S3 (or any similar filesystem)
Merging JSON files into newline delimited, much larger (aim for one or at most 10 files, not 30) files would be the only option here.
Python opening 30K files isn't going to be any slower than what you're already doing, it just won't be distributed.
Besides that, multiline=true
was particularly only added for the cases where you already have a really large JSON file and it's one top level array or object that's being stored. Before that option existed, "JSONLines" is the only format Spark could read.
The most consistent solution here would be to fix the ingestion pipeline that's writing all these files such that you can accumulate records ahead of time, then dump larger batches. Or just use Kafka rather than reading data from S3 (or any similar filesystem)
edited Nov 8 at 4:48
answered Nov 8 at 4:43
cricket_007
75.8k1042106
75.8k1042106
I don't have control over the creation of the JSON files.
– Jane Wayne
Nov 8 at 4:56
1
Well, that's unfortunate. Requesting a bunch of files at once from S3 in general means one round trip HTTP request for every file. As you can imagine, that's the bottleneck here, not Spark, and all other tools will have the same problem
– cricket_007
Nov 8 at 4:58
Your feedback is making me think that Spark or Hadoop-esque systems may suffer without data locality. I have 30,000 wave (audio) files corresponding to those JSON files and I was thinking of using Spark for audio processing as well. Fundamentally, I suppose the same issues will manifest as well? With JSON at least there's NDJSON, but what about binary/wav files? Any tips on that? The wave files are 5-10 MB each.
– Jane Wayne
Nov 8 at 6:13
I guess I can create my own Parquet file locally and upload that to S3.
– Jane Wayne
Nov 8 at 6:18
I don't know how Audio files make sense in a columnar format with specific types, but I would suggest Bzip2 format archives
– cricket_007
Nov 8 at 15:25
|
show 2 more comments
I don't have control over the creation of the JSON files.
– Jane Wayne
Nov 8 at 4:56
1
Well, that's unfortunate. Requesting a bunch of files at once from S3 in general means one round trip HTTP request for every file. As you can imagine, that's the bottleneck here, not Spark, and all other tools will have the same problem
– cricket_007
Nov 8 at 4:58
Your feedback is making me think that Spark or Hadoop-esque systems may suffer without data locality. I have 30,000 wave (audio) files corresponding to those JSON files and I was thinking of using Spark for audio processing as well. Fundamentally, I suppose the same issues will manifest as well? With JSON at least there's NDJSON, but what about binary/wav files? Any tips on that? The wave files are 5-10 MB each.
– Jane Wayne
Nov 8 at 6:13
I guess I can create my own Parquet file locally and upload that to S3.
– Jane Wayne
Nov 8 at 6:18
I don't know how Audio files make sense in a columnar format with specific types, but I would suggest Bzip2 format archives
– cricket_007
Nov 8 at 15:25
I don't have control over the creation of the JSON files.
– Jane Wayne
Nov 8 at 4:56
I don't have control over the creation of the JSON files.
– Jane Wayne
Nov 8 at 4:56
1
1
Well, that's unfortunate. Requesting a bunch of files at once from S3 in general means one round trip HTTP request for every file. As you can imagine, that's the bottleneck here, not Spark, and all other tools will have the same problem
– cricket_007
Nov 8 at 4:58
Well, that's unfortunate. Requesting a bunch of files at once from S3 in general means one round trip HTTP request for every file. As you can imagine, that's the bottleneck here, not Spark, and all other tools will have the same problem
– cricket_007
Nov 8 at 4:58
Your feedback is making me think that Spark or Hadoop-esque systems may suffer without data locality. I have 30,000 wave (audio) files corresponding to those JSON files and I was thinking of using Spark for audio processing as well. Fundamentally, I suppose the same issues will manifest as well? With JSON at least there's NDJSON, but what about binary/wav files? Any tips on that? The wave files are 5-10 MB each.
– Jane Wayne
Nov 8 at 6:13
Your feedback is making me think that Spark or Hadoop-esque systems may suffer without data locality. I have 30,000 wave (audio) files corresponding to those JSON files and I was thinking of using Spark for audio processing as well. Fundamentally, I suppose the same issues will manifest as well? With JSON at least there's NDJSON, but what about binary/wav files? Any tips on that? The wave files are 5-10 MB each.
– Jane Wayne
Nov 8 at 6:13
I guess I can create my own Parquet file locally and upload that to S3.
– Jane Wayne
Nov 8 at 6:18
I guess I can create my own Parquet file locally and upload that to S3.
– Jane Wayne
Nov 8 at 6:18
I don't know how Audio files make sense in a columnar format with specific types, but I would suggest Bzip2 format archives
– cricket_007
Nov 8 at 15:25
I don't know how Audio files make sense in a columnar format with specific types, but I would suggest Bzip2 format archives
– cricket_007
Nov 8 at 15:25
|
show 2 more comments
up vote
0
down vote
There's two HTTP requests a read, one HEAD, one GET; if the files are all kept in the same dir then the listing cost is simply one LIST/5000 objects, so 6 list calls. You'll pay ~$25 for 30K HEAD & GET calls.
If you are using spark to take the listing and generate a record from each individual file, as well as the overhead of scheduling a task per file. You can do a trick where you make the listing itself (which you do in .py) which becomes the input RDD (i.e one row-per-file) and the map() becomes the read of that file and the output of the map the record representing the single file. scala example. This addresses the spark scheduling overhead as that input listing will be split into bigger parts pushed out to the workers, so leaving only those HTTP HEAD/GET calls.
For this to work efficiently, use Hadoop 2.8+ Jars, and do the listing using FileSystem.listFiles(Path, true)
to a single recursive listing of the entire directory tree under the path, so using the S3 LIST API at its most optimal.
(Once you've done this, why not post the code up somewhere for others?)
add a comment |
up vote
0
down vote
There's two HTTP requests a read, one HEAD, one GET; if the files are all kept in the same dir then the listing cost is simply one LIST/5000 objects, so 6 list calls. You'll pay ~$25 for 30K HEAD & GET calls.
If you are using spark to take the listing and generate a record from each individual file, as well as the overhead of scheduling a task per file. You can do a trick where you make the listing itself (which you do in .py) which becomes the input RDD (i.e one row-per-file) and the map() becomes the read of that file and the output of the map the record representing the single file. scala example. This addresses the spark scheduling overhead as that input listing will be split into bigger parts pushed out to the workers, so leaving only those HTTP HEAD/GET calls.
For this to work efficiently, use Hadoop 2.8+ Jars, and do the listing using FileSystem.listFiles(Path, true)
to a single recursive listing of the entire directory tree under the path, so using the S3 LIST API at its most optimal.
(Once you've done this, why not post the code up somewhere for others?)
add a comment |
up vote
0
down vote
up vote
0
down vote
There's two HTTP requests a read, one HEAD, one GET; if the files are all kept in the same dir then the listing cost is simply one LIST/5000 objects, so 6 list calls. You'll pay ~$25 for 30K HEAD & GET calls.
If you are using spark to take the listing and generate a record from each individual file, as well as the overhead of scheduling a task per file. You can do a trick where you make the listing itself (which you do in .py) which becomes the input RDD (i.e one row-per-file) and the map() becomes the read of that file and the output of the map the record representing the single file. scala example. This addresses the spark scheduling overhead as that input listing will be split into bigger parts pushed out to the workers, so leaving only those HTTP HEAD/GET calls.
For this to work efficiently, use Hadoop 2.8+ Jars, and do the listing using FileSystem.listFiles(Path, true)
to a single recursive listing of the entire directory tree under the path, so using the S3 LIST API at its most optimal.
(Once you've done this, why not post the code up somewhere for others?)
There's two HTTP requests a read, one HEAD, one GET; if the files are all kept in the same dir then the listing cost is simply one LIST/5000 objects, so 6 list calls. You'll pay ~$25 for 30K HEAD & GET calls.
If you are using spark to take the listing and generate a record from each individual file, as well as the overhead of scheduling a task per file. You can do a trick where you make the listing itself (which you do in .py) which becomes the input RDD (i.e one row-per-file) and the map() becomes the read of that file and the output of the map the record representing the single file. scala example. This addresses the spark scheduling overhead as that input listing will be split into bigger parts pushed out to the workers, so leaving only those HTTP HEAD/GET calls.
For this to work efficiently, use Hadoop 2.8+ Jars, and do the listing using FileSystem.listFiles(Path, true)
to a single recursive listing of the entire directory tree under the path, so using the S3 LIST API at its most optimal.
(Once you've done this, why not post the code up somewhere for others?)
answered Nov 8 at 11:06
Steve Loughran
4,83611417
4,83611417
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53201637%2fhow-do-i-improve-loading-thousands-of-tiny-json-files-into-a-spark-dataframe%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown