Deduplication is necessary in append-only systems. Because of at least once delivery, there are duplicates due to technical issues, and where one entry should be superseded by a newer one.
For deduplication in SQL, the table needs to have two columns:
- deduplication field, typically an ID
- timestamp when inserted to order by
Note: SELECT DISTINCT
is not sufficient, esp. when created timestamps differ
Deduplicate in a Query
A simple way to deduplicate is directly in your query. We recommend doing this in a separate deduplication step with a common table expression to keep your actual query clean.
-- Step 1: Deduplicate
WITH mytable_deduplicated AS (
SELECT *
EXCEPT (row_number)
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY time DESC) row_number
FROM `mytable`)
WHERE row_number = 1
),
-- Step X: Process further
SELECT *
FROM mytable_deduplicated
Deduplicate in a Scheduled Job (Google BigQuery specific)
Deduplication can also happen periodically with scheduled jobs. Here's the SQL code of a scheduled job in Google BigQuery.
MERGE `mytable` target
USING (
SELECT
* EXCEPT(row_number)
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY time DESC) row_number
FROM `target`)
WHERE
row_number = 1
AND
time < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 MINUTE)
) source
ON FALSE
WHEN NOT MATCHED BY SOURCE AND time < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 MINUTE) THEN DELETE
WHEN NOT MATCHED BY TARGET AND time < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 MINUTE) THEN INSERT ROW
Learnings
-
-
- select and write to same table is not atomic
- always have a field with instant.now in the event, set the instant now right before sending the info to bigquery
- When streaming is used `bigquery.insertAll()`, the stream buffer cannot be manipulated by merge, it takes 30-90 minutes, until the entries are written to columnar storage.