Avoid specifying schema twice (Spark/scala)
up vote
-1
down vote
favorite
I need to iterate over data frame in specific order and apply some complex logic to calculate new column.
Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.
Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show
What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?
Update
Well, maybe it's tricky to avoid because result is produced in executor(s) and driver (which displays output) does not know about result structure, but still I'm not convinced that it's impossible to avoid.
scala apache-spark apache-spark-sql apache-spark-dataset
add a comment |
up vote
-1
down vote
favorite
I need to iterate over data frame in specific order and apply some complex logic to calculate new column.
Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.
Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show
What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?
Update
Well, maybe it's tricky to avoid because result is produced in executor(s) and driver (which displays output) does not know about result structure, but still I'm not convinced that it's impossible to avoid.
scala apache-spark apache-spark-sql apache-spark-dataset
Can you use Dataset and provide a function that maps from T => U?
– Terry Dactyl
Nov 8 at 13:00
@TerryDactyl, can you elaborate a bit more? I usedf.repartition($"part").sortWithinPartitions($"id")which isorg.apache.spark.sql.Dataset[org.apache.spark.sql.Row]and I supplyf_row(iter: Iterator[Row]) : Iterator[Row]formapPartitions.
– Dr Y Wit
Nov 8 at 13:09
A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
– Terry Dactyl
Nov 8 at 13:16
See below......
– Terry Dactyl
Nov 8 at 14:13
add a comment |
up vote
-1
down vote
favorite
up vote
-1
down vote
favorite
I need to iterate over data frame in specific order and apply some complex logic to calculate new column.
Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.
Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show
What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?
Update
Well, maybe it's tricky to avoid because result is produced in executor(s) and driver (which displays output) does not know about result structure, but still I'm not convinced that it's impossible to avoid.
scala apache-spark apache-spark-sql apache-spark-dataset
I need to iterate over data frame in specific order and apply some complex logic to calculate new column.
Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.
Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show
What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?
Update
Well, maybe it's tricky to avoid because result is produced in executor(s) and driver (which displays output) does not know about result structure, but still I'm not convinced that it's impossible to avoid.
scala apache-spark apache-spark-sql apache-spark-dataset
scala apache-spark apache-spark-sql apache-spark-dataset
edited Nov 8 at 16:10
asked Nov 8 at 11:10
Dr Y Wit
1,260311
1,260311
Can you use Dataset and provide a function that maps from T => U?
– Terry Dactyl
Nov 8 at 13:00
@TerryDactyl, can you elaborate a bit more? I usedf.repartition($"part").sortWithinPartitions($"id")which isorg.apache.spark.sql.Dataset[org.apache.spark.sql.Row]and I supplyf_row(iter: Iterator[Row]) : Iterator[Row]formapPartitions.
– Dr Y Wit
Nov 8 at 13:09
A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
– Terry Dactyl
Nov 8 at 13:16
See below......
– Terry Dactyl
Nov 8 at 14:13
add a comment |
Can you use Dataset and provide a function that maps from T => U?
– Terry Dactyl
Nov 8 at 13:00
@TerryDactyl, can you elaborate a bit more? I usedf.repartition($"part").sortWithinPartitions($"id")which isorg.apache.spark.sql.Dataset[org.apache.spark.sql.Row]and I supplyf_row(iter: Iterator[Row]) : Iterator[Row]formapPartitions.
– Dr Y Wit
Nov 8 at 13:09
A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
– Terry Dactyl
Nov 8 at 13:16
See below......
– Terry Dactyl
Nov 8 at 14:13
Can you use Dataset and provide a function that maps from T => U?
– Terry Dactyl
Nov 8 at 13:00
Can you use Dataset and provide a function that maps from T => U?
– Terry Dactyl
Nov 8 at 13:00
@TerryDactyl, can you elaborate a bit more? I use
df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.– Dr Y Wit
Nov 8 at 13:09
@TerryDactyl, can you elaborate a bit more? I use
df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.– Dr Y Wit
Nov 8 at 13:09
A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
– Terry Dactyl
Nov 8 at 13:16
A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
– Terry Dactyl
Nov 8 at 13:16
See below......
– Terry Dactyl
Nov 8 at 14:13
See below......
– Terry Dactyl
Nov 8 at 14:13
add a comment |
3 Answers
3
active
oldest
votes
up vote
0
down vote
What information is lost after applying mapPartitions so output cannot be processed without
The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.
schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.
How to avoid specifying it?
Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.
I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and orderingdf.repartition($"part").sortWithinPartitions($"id").showbut it's not possible aftermapPartitions.
– Dr Y Wit
Nov 8 at 12:35
And function used inmapPartitionsis(func: (Iterator[T]) ⇒ Iterator[U]). So whyshowcan not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
– Dr Y Wit
Nov 8 at 12:38
add a comment |
up vote
0
down vote
OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.
You need something like:
case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)
import spark.implicits._
// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]
def f_row(it: Iterator[Before]): Iterator[After] = ???
beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and dodf.as[my_record]orcase Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
– Dr Y Wit
Nov 8 at 15:50
add a comment |
up vote
0
down vote
I found below explanation sufficient, maybe it will be useful for others.
mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.
Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.
def addResult(df: org.apache.spark.sql.DataFrame) = {
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
implicit val encoder = RowEncoder(schema)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
}
Test
scala> addResult(df).show
+----+---+------+
|part| id|result|
+----+---+------+
| 2| 1| 10|
| 2| 2| 20|
| 2| 3| null|
| 2| 4| 40|
+----+---+------+
add a comment |
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
What information is lost after applying mapPartitions so output cannot be processed without
The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.
schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.
How to avoid specifying it?
Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.
I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and orderingdf.repartition($"part").sortWithinPartitions($"id").showbut it's not possible aftermapPartitions.
– Dr Y Wit
Nov 8 at 12:35
And function used inmapPartitionsis(func: (Iterator[T]) ⇒ Iterator[U]). So whyshowcan not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
– Dr Y Wit
Nov 8 at 12:38
add a comment |
up vote
0
down vote
What information is lost after applying mapPartitions so output cannot be processed without
The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.
schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.
How to avoid specifying it?
Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.
I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and orderingdf.repartition($"part").sortWithinPartitions($"id").showbut it's not possible aftermapPartitions.
– Dr Y Wit
Nov 8 at 12:35
And function used inmapPartitionsis(func: (Iterator[T]) ⇒ Iterator[U]). So whyshowcan not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
– Dr Y Wit
Nov 8 at 12:38
add a comment |
up vote
0
down vote
up vote
0
down vote
What information is lost after applying mapPartitions so output cannot be processed without
The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.
schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.
How to avoid specifying it?
Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.
What information is lost after applying mapPartitions so output cannot be processed without
The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.
schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.
How to avoid specifying it?
Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.
answered Nov 8 at 12:27
user10623686
1
1
I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and orderingdf.repartition($"part").sortWithinPartitions($"id").showbut it's not possible aftermapPartitions.
– Dr Y Wit
Nov 8 at 12:35
And function used inmapPartitionsis(func: (Iterator[T]) ⇒ Iterator[U]). So whyshowcan not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
– Dr Y Wit
Nov 8 at 12:38
add a comment |
I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and orderingdf.repartition($"part").sortWithinPartitions($"id").showbut it's not possible aftermapPartitions.
– Dr Y Wit
Nov 8 at 12:35
And function used inmapPartitionsis(func: (Iterator[T]) ⇒ Iterator[U]). So whyshowcan not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
– Dr Y Wit
Nov 8 at 12:38
I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering
df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.– Dr Y Wit
Nov 8 at 12:35
I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering
df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.– Dr Y Wit
Nov 8 at 12:35
And function used in
mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.– Dr Y Wit
Nov 8 at 12:38
And function used in
mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.– Dr Y Wit
Nov 8 at 12:38
add a comment |
up vote
0
down vote
OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.
You need something like:
case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)
import spark.implicits._
// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]
def f_row(it: Iterator[Before]): Iterator[After] = ???
beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and dodf.as[my_record]orcase Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
– Dr Y Wit
Nov 8 at 15:50
add a comment |
up vote
0
down vote
OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.
You need something like:
case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)
import spark.implicits._
// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]
def f_row(it: Iterator[Before]): Iterator[After] = ???
beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and dodf.as[my_record]orcase Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
– Dr Y Wit
Nov 8 at 15:50
add a comment |
up vote
0
down vote
up vote
0
down vote
OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.
You need something like:
case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)
import spark.implicits._
// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]
def f_row(it: Iterator[Before]): Iterator[After] = ???
beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.
You need something like:
case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)
import spark.implicits._
// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]
def f_row(it: Iterator[Before]): Iterator[After] = ???
beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
answered Nov 8 at 14:13
Terry Dactyl
1,073412
1,073412
As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and dodf.as[my_record]orcase Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
– Dr Y Wit
Nov 8 at 15:50
add a comment |
As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and dodf.as[my_record]orcase Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
– Dr Y Wit
Nov 8 at 15:50
As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do
df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…– Dr Y Wit
Nov 8 at 15:50
As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do
df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…– Dr Y Wit
Nov 8 at 15:50
add a comment |
up vote
0
down vote
I found below explanation sufficient, maybe it will be useful for others.
mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.
Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.
def addResult(df: org.apache.spark.sql.DataFrame) = {
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
implicit val encoder = RowEncoder(schema)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
}
Test
scala> addResult(df).show
+----+---+------+
|part| id|result|
+----+---+------+
| 2| 1| 10|
| 2| 2| 20|
| 2| 3| null|
| 2| 4| 40|
+----+---+------+
add a comment |
up vote
0
down vote
I found below explanation sufficient, maybe it will be useful for others.
mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.
Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.
def addResult(df: org.apache.spark.sql.DataFrame) = {
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
implicit val encoder = RowEncoder(schema)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
}
Test
scala> addResult(df).show
+----+---+------+
|part| id|result|
+----+---+------+
| 2| 1| 10|
| 2| 2| 20|
| 2| 3| null|
| 2| 4| 40|
+----+---+------+
add a comment |
up vote
0
down vote
up vote
0
down vote
I found below explanation sufficient, maybe it will be useful for others.
mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.
Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.
def addResult(df: org.apache.spark.sql.DataFrame) = {
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
implicit val encoder = RowEncoder(schema)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
}
Test
scala> addResult(df).show
+----+---+------+
|part| id|result|
+----+---+------+
| 2| 1| 10|
| 2| 2| 20|
| 2| 3| null|
| 2| 4| 40|
+----+---+------+
I found below explanation sufficient, maybe it will be useful for others.
mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.
Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.
def addResult(df: org.apache.spark.sql.DataFrame) = {
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
implicit val encoder = RowEncoder(schema)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
}
Test
scala> addResult(df).show
+----+---+------+
|part| id|result|
+----+---+------+
| 2| 1| 10|
| 2| 2| 20|
| 2| 3| null|
| 2| 4| 40|
+----+---+------+
edited 2 days ago
answered 2 days ago
Dr Y Wit
1,260311
1,260311
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53206556%2favoid-specifying-schema-twice-spark-scala%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Can you use Dataset and provide a function that maps from T => U?
– Terry Dactyl
Nov 8 at 13:00
@TerryDactyl, can you elaborate a bit more? I use
df.repartition($"part").sortWithinPartitions($"id")which isorg.apache.spark.sql.Dataset[org.apache.spark.sql.Row]and I supplyf_row(iter: Iterator[Row]) : Iterator[Row]formapPartitions.– Dr Y Wit
Nov 8 at 13:09
A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
– Terry Dactyl
Nov 8 at 13:16
See below......
– Terry Dactyl
Nov 8 at 14:13