Pyspark Impala jdbc Driver does not support this optional feature











up vote
0
down vote

favorite












I am using pyspark for spark streaming. I am able to stream and create the dataframe properly with no issues. I was also able to insert data into Impala table created with only a few(5) sampled columns out of the overall columns(72) in the message from Kafka. But when I create a new a table with proper data types and columns, similarly the dataframe now has all the columns mentioned in the message of Kafka stream. I get the below exception.




java.sql.SQLFeatureNotSupportedException: [Cloudera]JDBC Driver does not support this optional feature.
at com.cloudera.impala.exceptions.ExceptionConverter.toSQLException(Unknown Source)
at com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)
at com.cloudera.impala.jdbc.common.SPreparedStatement.setNull(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:627)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)




I have searched a lot on this, but could not find any solution on this. I enabled debug logs as well, still it won't mention what feature does the driver not support.
Any help or proper guidance would be appreciated.
Thank you



Version details :



pyspark : 2.2.0
Kafka : 0.10.2
Cloudera : 5.15.0
Cloudera Impala : 2.12.0-cdh5.15.0
Cloudera Impala JDBC driver : 2.6.4



The code I have used :



import json
from pyspark import SparkContext,SparkConf,HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import lit
from pyspark.sql.types import *

conf = SparkConf().setAppName("testkafkarecvstream")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
spark = SparkSession.builder.appName("testkafkarecvstream").getOrCreate()
jdbcUrl = "jdbc:impala://hostname:21050/dbName;AuthMech=0;"

fields = [
StructField("column_name01", StringType(), True),
StructField("column_name02", StringType(), True),
StructField("column_name03", DoubleType(), True),
StructField("column_name04", StringType(), True),
StructField("column_name05", IntegerType(), True),
StructField("column_name06", StringType(), True),
.....................
StructField("column_name72", StringType(), True),
]

schema = StructType(fields)

def make_rows(parts):
customRow = Row(column_name01=datatype(parts['column_name01']),
.....,
column_name72=datatype(parts['column_name72'])
)
return customRow


def createDFToParquet(rdd):
try:
df = spark.createDataFrame(rdd,schema)
df.show()df.write.jdbc(jdbcUrl,
table="table_name",
mode="append",)
except Exception as e:
print str(e)


zkNode = "zkNode_name:2181"
topic = "topic_name"

# Reciever method
kvs = KafkaUtils.createStream(ssc,
zkNode,
"consumer-group-id",
{topic:5},
{"auto.offset.reset" : "smallest"})

lines = kvs.map(lambda x: x[1])
conv = lines.map(lambda x: json.loads(x))
table = conv.map(makeRows)
table.foreachRDD(createDFToParquet)

table.pprint()

ssc.start()
ssc.awaitTermination()









share|improve this question
























  • Are you trying to define a Array or struct?
    – karma4917
    Nov 9 at 16:37










  • Can you show some code you tried so far? Also, what version of JDBC are you using for Impala?
    – karma4917
    Nov 9 at 16:54










  • Q : Are you trying to define a Array or struct? I have defined an array for the schema. Q : Also, what version of JDBC are you using for Impala? A : Cloudera Impala JDBC driver : 2.6.4
    – Kaustubh Desai
    Nov 11 at 11:00

















up vote
0
down vote

favorite












I am using pyspark for spark streaming. I am able to stream and create the dataframe properly with no issues. I was also able to insert data into Impala table created with only a few(5) sampled columns out of the overall columns(72) in the message from Kafka. But when I create a new a table with proper data types and columns, similarly the dataframe now has all the columns mentioned in the message of Kafka stream. I get the below exception.




java.sql.SQLFeatureNotSupportedException: [Cloudera]JDBC Driver does not support this optional feature.
at com.cloudera.impala.exceptions.ExceptionConverter.toSQLException(Unknown Source)
at com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)
at com.cloudera.impala.jdbc.common.SPreparedStatement.setNull(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:627)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)




I have searched a lot on this, but could not find any solution on this. I enabled debug logs as well, still it won't mention what feature does the driver not support.
Any help or proper guidance would be appreciated.
Thank you



Version details :



pyspark : 2.2.0
Kafka : 0.10.2
Cloudera : 5.15.0
Cloudera Impala : 2.12.0-cdh5.15.0
Cloudera Impala JDBC driver : 2.6.4



The code I have used :



import json
from pyspark import SparkContext,SparkConf,HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import lit
from pyspark.sql.types import *

conf = SparkConf().setAppName("testkafkarecvstream")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
spark = SparkSession.builder.appName("testkafkarecvstream").getOrCreate()
jdbcUrl = "jdbc:impala://hostname:21050/dbName;AuthMech=0;"

fields = [
StructField("column_name01", StringType(), True),
StructField("column_name02", StringType(), True),
StructField("column_name03", DoubleType(), True),
StructField("column_name04", StringType(), True),
StructField("column_name05", IntegerType(), True),
StructField("column_name06", StringType(), True),
.....................
StructField("column_name72", StringType(), True),
]

schema = StructType(fields)

def make_rows(parts):
customRow = Row(column_name01=datatype(parts['column_name01']),
.....,
column_name72=datatype(parts['column_name72'])
)
return customRow


def createDFToParquet(rdd):
try:
df = spark.createDataFrame(rdd,schema)
df.show()df.write.jdbc(jdbcUrl,
table="table_name",
mode="append",)
except Exception as e:
print str(e)


zkNode = "zkNode_name:2181"
topic = "topic_name"

# Reciever method
kvs = KafkaUtils.createStream(ssc,
zkNode,
"consumer-group-id",
{topic:5},
{"auto.offset.reset" : "smallest"})

lines = kvs.map(lambda x: x[1])
conv = lines.map(lambda x: json.loads(x))
table = conv.map(makeRows)
table.foreachRDD(createDFToParquet)

table.pprint()

ssc.start()
ssc.awaitTermination()









share|improve this question
























  • Are you trying to define a Array or struct?
    – karma4917
    Nov 9 at 16:37










  • Can you show some code you tried so far? Also, what version of JDBC are you using for Impala?
    – karma4917
    Nov 9 at 16:54










  • Q : Are you trying to define a Array or struct? I have defined an array for the schema. Q : Also, what version of JDBC are you using for Impala? A : Cloudera Impala JDBC driver : 2.6.4
    – Kaustubh Desai
    Nov 11 at 11:00















up vote
0
down vote

favorite









up vote
0
down vote

favorite











I am using pyspark for spark streaming. I am able to stream and create the dataframe properly with no issues. I was also able to insert data into Impala table created with only a few(5) sampled columns out of the overall columns(72) in the message from Kafka. But when I create a new a table with proper data types and columns, similarly the dataframe now has all the columns mentioned in the message of Kafka stream. I get the below exception.




java.sql.SQLFeatureNotSupportedException: [Cloudera]JDBC Driver does not support this optional feature.
at com.cloudera.impala.exceptions.ExceptionConverter.toSQLException(Unknown Source)
at com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)
at com.cloudera.impala.jdbc.common.SPreparedStatement.setNull(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:627)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)




I have searched a lot on this, but could not find any solution on this. I enabled debug logs as well, still it won't mention what feature does the driver not support.
Any help or proper guidance would be appreciated.
Thank you



Version details :



pyspark : 2.2.0
Kafka : 0.10.2
Cloudera : 5.15.0
Cloudera Impala : 2.12.0-cdh5.15.0
Cloudera Impala JDBC driver : 2.6.4



The code I have used :



import json
from pyspark import SparkContext,SparkConf,HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import lit
from pyspark.sql.types import *

conf = SparkConf().setAppName("testkafkarecvstream")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
spark = SparkSession.builder.appName("testkafkarecvstream").getOrCreate()
jdbcUrl = "jdbc:impala://hostname:21050/dbName;AuthMech=0;"

fields = [
StructField("column_name01", StringType(), True),
StructField("column_name02", StringType(), True),
StructField("column_name03", DoubleType(), True),
StructField("column_name04", StringType(), True),
StructField("column_name05", IntegerType(), True),
StructField("column_name06", StringType(), True),
.....................
StructField("column_name72", StringType(), True),
]

schema = StructType(fields)

def make_rows(parts):
customRow = Row(column_name01=datatype(parts['column_name01']),
.....,
column_name72=datatype(parts['column_name72'])
)
return customRow


def createDFToParquet(rdd):
try:
df = spark.createDataFrame(rdd,schema)
df.show()df.write.jdbc(jdbcUrl,
table="table_name",
mode="append",)
except Exception as e:
print str(e)


zkNode = "zkNode_name:2181"
topic = "topic_name"

# Reciever method
kvs = KafkaUtils.createStream(ssc,
zkNode,
"consumer-group-id",
{topic:5},
{"auto.offset.reset" : "smallest"})

lines = kvs.map(lambda x: x[1])
conv = lines.map(lambda x: json.loads(x))
table = conv.map(makeRows)
table.foreachRDD(createDFToParquet)

table.pprint()

ssc.start()
ssc.awaitTermination()









share|improve this question















I am using pyspark for spark streaming. I am able to stream and create the dataframe properly with no issues. I was also able to insert data into Impala table created with only a few(5) sampled columns out of the overall columns(72) in the message from Kafka. But when I create a new a table with proper data types and columns, similarly the dataframe now has all the columns mentioned in the message of Kafka stream. I get the below exception.




java.sql.SQLFeatureNotSupportedException: [Cloudera]JDBC Driver does not support this optional feature.
at com.cloudera.impala.exceptions.ExceptionConverter.toSQLException(Unknown Source)
at com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)
at com.cloudera.impala.jdbc.common.SPreparedStatement.setNull(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:627)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)




I have searched a lot on this, but could not find any solution on this. I enabled debug logs as well, still it won't mention what feature does the driver not support.
Any help or proper guidance would be appreciated.
Thank you



Version details :



pyspark : 2.2.0
Kafka : 0.10.2
Cloudera : 5.15.0
Cloudera Impala : 2.12.0-cdh5.15.0
Cloudera Impala JDBC driver : 2.6.4



The code I have used :



import json
from pyspark import SparkContext,SparkConf,HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import lit
from pyspark.sql.types import *

conf = SparkConf().setAppName("testkafkarecvstream")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
spark = SparkSession.builder.appName("testkafkarecvstream").getOrCreate()
jdbcUrl = "jdbc:impala://hostname:21050/dbName;AuthMech=0;"

fields = [
StructField("column_name01", StringType(), True),
StructField("column_name02", StringType(), True),
StructField("column_name03", DoubleType(), True),
StructField("column_name04", StringType(), True),
StructField("column_name05", IntegerType(), True),
StructField("column_name06", StringType(), True),
.....................
StructField("column_name72", StringType(), True),
]

schema = StructType(fields)

def make_rows(parts):
customRow = Row(column_name01=datatype(parts['column_name01']),
.....,
column_name72=datatype(parts['column_name72'])
)
return customRow


def createDFToParquet(rdd):
try:
df = spark.createDataFrame(rdd,schema)
df.show()df.write.jdbc(jdbcUrl,
table="table_name",
mode="append",)
except Exception as e:
print str(e)


zkNode = "zkNode_name:2181"
topic = "topic_name"

# Reciever method
kvs = KafkaUtils.createStream(ssc,
zkNode,
"consumer-group-id",
{topic:5},
{"auto.offset.reset" : "smallest"})

lines = kvs.map(lambda x: x[1])
conv = lines.map(lambda x: json.loads(x))
table = conv.map(makeRows)
table.foreachRDD(createDFToParquet)

table.pprint()

ssc.start()
ssc.awaitTermination()






jdbc pyspark spark-streaming cloudera impala






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 11 at 11:50

























asked Nov 9 at 12:53









Kaustubh Desai

12




12












  • Are you trying to define a Array or struct?
    – karma4917
    Nov 9 at 16:37










  • Can you show some code you tried so far? Also, what version of JDBC are you using for Impala?
    – karma4917
    Nov 9 at 16:54










  • Q : Are you trying to define a Array or struct? I have defined an array for the schema. Q : Also, what version of JDBC are you using for Impala? A : Cloudera Impala JDBC driver : 2.6.4
    – Kaustubh Desai
    Nov 11 at 11:00




















  • Are you trying to define a Array or struct?
    – karma4917
    Nov 9 at 16:37










  • Can you show some code you tried so far? Also, what version of JDBC are you using for Impala?
    – karma4917
    Nov 9 at 16:54










  • Q : Are you trying to define a Array or struct? I have defined an array for the schema. Q : Also, what version of JDBC are you using for Impala? A : Cloudera Impala JDBC driver : 2.6.4
    – Kaustubh Desai
    Nov 11 at 11:00


















Are you trying to define a Array or struct?
– karma4917
Nov 9 at 16:37




Are you trying to define a Array or struct?
– karma4917
Nov 9 at 16:37












Can you show some code you tried so far? Also, what version of JDBC are you using for Impala?
– karma4917
Nov 9 at 16:54




Can you show some code you tried so far? Also, what version of JDBC are you using for Impala?
– karma4917
Nov 9 at 16:54












Q : Are you trying to define a Array or struct? I have defined an array for the schema. Q : Also, what version of JDBC are you using for Impala? A : Cloudera Impala JDBC driver : 2.6.4
– Kaustubh Desai
Nov 11 at 11:00






Q : Are you trying to define a Array or struct? I have defined an array for the schema. Q : Also, what version of JDBC are you using for Impala? A : Cloudera Impala JDBC driver : 2.6.4
– Kaustubh Desai
Nov 11 at 11:00



















active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














 

draft saved


draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53226069%2fpyspark-impala-jdbc-driver-does-not-support-this-optional-feature%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown






























active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes
















 

draft saved


draft discarded



















































 


draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53226069%2fpyspark-impala-jdbc-driver-does-not-support-this-optional-feature%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Schultheiß

Verwaltungsgliederung Dänemarks

Liste der Kulturdenkmale in Wilsdruff