添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

This browser is no longer supported.

Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.

Download Microsoft Edge More info about Internet Explorer and Microsoft Edge

Azure Databricks provides built-in monitoring for Structured Streaming applications through the Spark UI under the Streaming tab.

Distinguish Structured Streaming queries in the Spark UI

Provide your streams a unique query name by adding .queryName(<query_name>) to your writeStream code to easily distinguish which metrics belong to which stream in the Spark UI.

Push Structured Streaming metrics to external services

Streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark’s Streaming Query Listener interface. In Databricks Runtime 11.0 and above, the Streaming Query Listener is available in Python and Scala.

Important

Credentials and objects managed by Unity Catalog cannot be used in StreamingQueryListener logic.

Processing latency associated with listeners can adversely impact query processing. Databricks recommends minimizing processing logic in these listeners and writing to low latency sinks such as Kafka.

The following code provides basic examples of the syntax for implementing a listener:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
  def onQueryStarted(event: QueryStartedEvent): Unit = {}
    * Called when there is some status update (ingestion rate updated, etc.)
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
  def onQueryProgress(event: QueryProgressEvent): Unit = {}
    * Called when a query is stopped, with or without error.
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        Called when a query is started.
        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.
        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
    def onQueryProgress(self, event):
        Called when there is some status update (ingestion rate updated, etc.)
        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.
        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
    def onQueryTerminated(self, event):
        Called when a query is stopped, with or without error.
        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
my_listener = MyListener()

Defining observable metrics in Structured Streaming

Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.

You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:

  • Batch mode: Use QueryExecutionListener.

    QueryExecutionListener is called when the query completes. Access the metrics using the QueryExecution.observedMetrics map.

  • Streaming, or micro-batch: Use StreamingQueryListener.

    StreamingQueryListener is called when the streaming query completes an epoch. Access the metrics using the StreamingQueryProgress.observedMetrics map. Azure Databricks does not support continuous execution streaming.

    For example:

    Scala

    // Observe row count (rc) and error row count (erc) in the streaming Dataset
    val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
    observed_ds.writeStream.format("...").start()
    // Monitor the metrics using a listener
    spark.streams.addListener(new StreamingQueryListener() {
      override def onQueryProgress(event: QueryProgressEvent): Unit = {
        event.progress.observedMetrics.get("my_event").foreach { row =>
          // Trigger if the number of errors exceeds 5 percent
          val num_rows = row.getAs[Long]("rc")
          val num_error_rows = row.getAs[Long]("erc")
          val ratio = num_error_rows.toDouble / num_rows
          if (ratio > 0.05) {
            // Trigger alert
    

    Python

    # Observe metric
    observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
    observed_df.writeStream.format("...").start()
    # Define my listener.
    class MyListener(StreamingQueryListener):
        def onQueryStarted(self, event):
            print(f"'{event.name}' [{event.id}] got started!")
        def onQueryProgress(self, event):
            row = event.progress.observedMetrics.get("metric")
            if row is not None:
                if row.malformed / row.cnt > 0.5:
                    print("ALERT! Ouch! there are too many malformed "
                          f"records {row.malformed} out of {row.cnt}!")
                else:
                    print(f"{row.cnt} rows processed!")
        def onQueryTerminated(self, event):
            print(f"{event.id} got terminated!")
    # Add my listener.
    spark.streams.addListener(MyListener())
    
  •