Used it in production several weeks before first release
Was first adopter in Sberbank's production
Giving talks on it
Trying to use it EVERYWHERE
My story with data engineering
Data engineering is all about Java, Python and Scala.
And of course I've started with Scala — it's native!
But soon I realized that lots of things could be easier with Kotlin!
Kotlin key benefits
JVM
null-aware and null-safe type system
Extension methods (and values)
Reified generics
DSL-building abilities
Compatible (?) with other JVM ecosystem languages
Note on null-aware type system
Note on null-aware type system
classZfunmain(){
val nullZ: Z? = nullval z = Z() println(nullZ is Z) // false
println(z is Z) // true
println(nullZ is Z?) // true
println(z is Z?) // true
}
Note on null-aware type system
classZfunmain(){
val nullZ: Z? = nullval z = Z() println(nullZ is Z) // false println(z is Z) // true
println(nullZ is Z?) // true
println(z is Z?) // true
}
Note on null-aware type system
classZfunmain(){
val nullZ: Z? = nullval z = Z() println(nullZ is Z) // false println(z is Z) // true println(nullZ is Z?) // true println(z is Z?) // true}
Extension methods
fun Iterable<Int>.sum() = reduce { a, b -> a + b }
This already exists in stdlib
Reified generics
Generics on JVM can't be reified. Ever.
But there are inline methods in Kotlin. They will be inlined at compile time:
inlinefunrunIt(func: () -> Unit) = func()
And if method is inlined we can reify generic at call site!
inlinefun<reified T>callIt(func: () -> T): T = func()
DSL-building
Extension functions in conjunction with functional arguments allow us to following magic:
funhtml(init: HTML.() -> Unit): HTML {
val result = HTML()
HTML.init() // or return HTML().apply { init () }return result
}
fun HTML.h1(text: String) = addElement("<h1>$text<h1>")
html { h1("Example") }
And the journey begins
I mean
But why Frankenstein's?
And why monster?
It is alike my surname
We need to crossbreed Kotlin and Scala to produce something with best parts of both worlds!
And at the start of experiment we don't have any idea on its behavior!
First goal
We want to make standart simple operation on DataFrame to work
list((1, "a"), (2, "b"))
.toDS
.map(_._1)
.show
First sketch
val spark = SparkSession.orCreate
listOf("a" to "and", "b" to "beetle")
.map(MapFunction { it }, Encoders.bean())
.show
Fails
Encoder can't (de)serialize
OK, but
Let's try to create operation at least over primitves!
And it starts to look like DSL!
And we love Spark to be DSL
Type inference!
Generics are everywhere
Generics are being erased at runtime
Need to find some hack
Jackson
publicabstractclassTypeReference<T> {
protectedfinal Type _type;protected TypeReference() {
Type superClass = getClass().getGenericSuperclass();if (superClass instanceof Class<?>) {
// ↑ sanity check, should never happenthrow new IllegalArgumentException(/* */);
// ↑ comment that not enough data
}
_type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; }
public Type getType() { return _type; }
Jackson
publicabstractclassTypeReference<T> {
protectedfinal Type _type;
protected TypeReference() {
Type superClass = getClass().getGenericSuperclass();
if (superClass instanceof Class<?>) {// ↑ sanity check, should never happenthrow new IllegalArgumentException();// ↑ comment that not enough data } _type = ((ParameterizedType) superClass).getActualTypeArguments()[0];
}
public Type getType() { return _type; }
Now let's translate it to Kotlin
abstractclassTypeRef<T> protectedconstructor() {
var type: ParameterizedType
init {
val sC = this::class.java.genericSuperclass
require(sC !is Class<*>) { "error" }
// ↑ should never happenthis.type = sC as ParameterizedType
Easy!
And use it…
funobtainGenericDataSchema(typeImpl: ParameterizedTypeImpl): DataType {
val z = typeImpl.rawType.kotlin.declaredMemberProperties
val y = typeImpl.actualTypeArguments
return StructType(
KotlinReflectionHelper
.dataClassProps(typeImpl.rawType.kotlin)
.map {
val dt = if(!it.c.isData)
JavaTypeInference.inferDataType(it.c.java)._1
elsenull
StructField(it.name, dt, it.nullable, Metadata.empty())
}
.toTypedArray()
And it won't work
Getting a little scary, right?
And it won't work
Because Jackson's hack won't work in Kotlin
By the way, it's the beginning of the story of love to the Monster!
Because it's boring when everything works.
What should I do now? Google!
inlinefun<reified T : Any>getKType(): KType = object : SuperTypeTokenHolder<T>() {}.getKTypeImpl()
openclassSuperTypeTokenHolder<T>fun SuperTypeTokenHolder<*>.getKTypeImpl(): KType = javaClass.genericSuperclass.toKType().arguments.single().type!!
fun KClass<*>.toInvariantFlexibleProjection(arguments: List<KTypeProjection> = emptyList()): KTypeProjection {
val args = if (java.isArray()) listOf(java.componentType.kotlin.toInvariantFlexibleProjection()) else arguments
return KTypeProjection.invariant(createType(args, nullable = false))
}
fun Type.toKTypeProjection(): KTypeProjection = when (this) {
is Class<*> -> this.kotlin.toInvariantFlexibleProjection()
is ParameterizedType -> {
val erasure = (rawType as Class<*>).kotlin
erasure.toInvariantFlexibleProjection((erasure.typeParameters.zip(actualTypeArguments).map { (parameter, argument) ->
val projection = argument.toKTypeProjection()
projection.takeIf {
parameter.variance == KVariance.INVARIANT || parameter.variance != projection.variance
} ?: KTypeProjection.invariant(projection.type!!)
}))
}
is WildcardType -> when {
lowerBounds.isNotEmpty() -> KTypeProjection.contravariant(lowerBounds.single().toKType())
upperBounds.isNotEmpty() -> KTypeProjection.covariant(upperBounds.single().toKType())
else -> KTypeProjection.STAR
}
is GenericArrayType -> Array<Any>::class.toInvariantFlexibleProjection(listOf(genericComponentType.toKTypeProjection()))
is TypeVariable<*> -> TODO() // TODO
else -> throw IllegalArgumentException("Unsupported type: $this")
}
fun Type.toKType(): KType = toKTypeProjection().type!!
Add custom logics when there is predefined schema:
case _ if predefinedDt.isDefined =>
predefinedDt.get match {
case dataType: KDataTypeWrapper =>
val cls = dataType.cls
val properties = getJavaBeanReadableProperties(cls)
val structFields = dataType.dt.fields.map… //boringval fields = structFields.map { structField =>
// recursive here …
createSerializerForObject(inputObject, fields)
Dark side of Scala
LOTS of debug here
Scala is very type safe. Bit codegen is not!
So use
LogLevel.DEBUG
"spark.sql.codegen.comments" true to view data flow